Home > Articles > Programming > Java

Applied Big Data Analysis in the Real World with MapReduce and Hadoop

  • Print
  • + Share This
  • 💬 Discuss
Like this article? We recommend
In the third article in this series, Java programming expert Steven Haines demonstrates how to build a meaningful Hadoop MapReduce application to analyze hourly website usage from a set of Apache HTTP Server logs. Learn how to analyze a business problem the MapReduce way and then how to structure key and value types to fit the MapReduce model.

Editor's Note: This is the third article in a three-part series. Be sure to read the first two articles:

Big Data Analysis with MapReduce and Hadoop
Building a MapReduce Application with Hadoop

The last two articles presented and overview of Hadoop and its architecture and then demonstrated how to build the WordCount application, which is the “Hello, World” sample application in the MapReduce domain. This article builds upon that foundation and demonstrates how to apply MapReduce to a real-world problem: log file analysis.

Visits Per Hour

A common metric that web analytic tools provide about website traffic is the number of page views on a per-hour basis. This helps you better understand the patterns of your users, which can be used to expand and contract your environment if you are running on an elastic platform. For example, if your peak load is from 6pm-8pm but you have virtually no traffic from 3am-6am, then you can scale down your environment in the middle of the night to save costs and you can scale up at 6pm so that your environment can support your load.

In order to compute the number of page visits for each hour, the strategy this example employs is to create a custom Key class that encapsulates an hour (day, month, year, and hour) and then map that key to the number of observed page views for that hour. Just as we did with the WordCount example, the mapper will return the key mapped to the value 1, and then the reducer and combiners will compute the actual count of occurrences for each hour.

The challenge that we’ll face in this example, as opposed to the word count example, is that we need to create a custom key class to hold our date. Building a custom key class is not hard, but it requires that we build a class that implements WritableComparable and override the following three methods:

  • readFields(): Reads the object’s fields from a DataInput object.
  • write(): Writes the object’s fields to a DataOutput object.
  • compareTo(): Standard comparable method that compares this object to another object of the same type.

In this example, we build a reusable DateWritable object, named similar to an IntWritable, that persists a date to and from a binary data object, shown in listing 1.

Listing 1. DateWritable.java

package com.geekcap.hadoopexamples;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class DateWritable implements WritableComparable<DateWritable>
{
	private final static SimpleDateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd' T 'HH:mm:ss.SSS" );
	private Date date;
	
	public Date getDate()
	{
		return date;
	}
	
	public void setDate( Date date )
	{
		this.date = date;
	}
	
	public void readFields( DataInput in ) throws IOException 
	{
		date = new Date( in.readLong() );
	}
	
	public void write( DataOutput out ) throws IOException 
	{
		out.writeLong( date.getTime() );
	}
	
	public String toString() 
	{
		return formatter.format( date);
	}

    public int compareTo( DateWritable other )
    {
        return date.compareTo( other.getDate() );
    }
}

The DateWritable class is straightforward: It wraps a date, implements the readFields() method by reading the date in as a long, and writes the date out to the DataOutput by converting the date to a long. Finally, the comparison is delegated to the Date class’s compareTo() method.

With this key in place, the next step is to build a Hadoop class that uses this key in a mapper, build a reducer, and assemble it into a workable application. Listing 2 shows the code for the LogCountsPerHour Hadoop application.

Listing 2. LogCountsPerHour.java

package com.geekcap.hadoopexamples;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;

import java.io.IOException;
import java.util.Calendar;
import java.util.Iterator;

public class LogCountsPerHour extends Configured implements Tool {

    public static class LogMapClass extends MapReduceBase
            implements Mapper<LongWritable, Text, DateWritable, IntWritable>
    {
        private DateWritable date = new DateWritable();
        private final static IntWritable one = new IntWritable( 1 );

        public void map( LongWritable key, // Offset into the file
                         Text value,
                         OutputCollector<DateWritable, IntWritable> output,
                         Reporter reporter) throws IOException
        {
            // Get the value as a String; it is of the format:
        	// 111.111.111.111 - - [16/Dec/2012:05:32:50 -0500] "GET / HTTP/1.1" 200 14791 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
            String text = value.toString();
            
            // Get the date and time
            int openBracket = text.indexOf( '[' );
            int closeBracket = text.indexOf( ']' );
            if( openBracket != -1 && closeBracket != -1 )
            {
            	// Read the date
            	String dateString = text.substring( text.indexOf( '[' ) + 1, text.indexOf( ']' ) );

            	// Build a date object from a string of the form: 16/Dec/2012:05:32:50 -0500
                int index = 0;
                int nextIndex = dateString.indexOf( '/' );
                int day = Integer.parseInt( dateString.substring(index, nextIndex) );

                index = nextIndex;
                nextIndex = dateString.indexOf( '/', index+1 );
                String month = dateString.substring( index+1, nextIndex );

                index = nextIndex;
                nextIndex = dateString.indexOf( ':', index );
                int year = Integer.parseInt(dateString.substring(index + 1, nextIndex));

                index = nextIndex;
                nextIndex = dateString.indexOf( ':', index+1 );
                int hour = Integer.parseInt(dateString.substring(index + 1, nextIndex));

                // Build a calendar object for this date
                Calendar calendar = Calendar.getInstance();
                calendar.set( Calendar.DATE, day );
                calendar.set( Calendar.YEAR, year );
                calendar.set( Calendar.HOUR, hour );
                calendar.set( Calendar.MINUTE, 0 );
                calendar.set( Calendar.SECOND, 0 );
                calendar.set( Calendar.MILLISECOND, 0 );

                if( month.equalsIgnoreCase( "dec" ) )
                {
                    calendar.set( Calendar.MONTH, Calendar.DECEMBER );
                }
                else if( month.equalsIgnoreCase( "nov" ) )
                {
                    calendar.set( Calendar.MONTH, Calendar.NOVEMBER );
                }
                else if( month.equalsIgnoreCase( "oct" ) )
                {
                    calendar.set( Calendar.MONTH, Calendar.OCTOBER );
                }
                else if( month.equalsIgnoreCase( "sep" ) )
                {
                    calendar.set( Calendar.MONTH, Calendar.SEPTEMBER );
                }
                else if( month.equalsIgnoreCase( "aug" ) )
                {
                    calendar.set( Calendar.MONTH, Calendar.AUGUST );
                }
                else if( month.equalsIgnoreCase( "jul" ) )
                {
                    calendar.set( Calendar.MONTH, Calendar.JULY );
                }
                else if( month.equalsIgnoreCase( "jun" ) )
                {
                    calendar.set( Calendar.MONTH, Calendar.JUNE );
                }
                else if( month.equalsIgnoreCase( "may" ) )
                {
                    calendar.set( Calendar.MONTH, Calendar.MAY );
                }
                else if( month.equalsIgnoreCase( "apr" ) )
                {
                    calendar.set( Calendar.MONTH, Calendar.APRIL );
                }
                else if( month.equalsIgnoreCase( "mar" ) )
                {
                    calendar.set( Calendar.MONTH, Calendar.MARCH );
                }
                else if( month.equalsIgnoreCase( "feb" ) )
                {
                    calendar.set( Calendar.MONTH, Calendar.FEBRUARY );
                }
                else if( month.equalsIgnoreCase( "jan" ) )
                {
                    calendar.set( Calendar.MONTH, Calendar.JANUARY );
                }


                // Output the date as the key and 1 as the value
                date.setDate( calendar.getTime() );
                output.collect(date, one);
            }
        }
    }

    public static class LogReduce extends MapReduceBase
            implements Reducer<DateWritable, IntWritable, DateWritable, IntWritable>
    {
        public void reduce( DateWritable key, Iterator<IntWritable> values,
                            OutputCollector<DateWritable, IntWritable> output,
                            Reporter reporter) throws IOException
        {
            // Iterate over all of the values (counts of occurrences of this word)
            int count = 0;
            while( values.hasNext() )
            {
                // Add the value to our count
                count += values.next().get();
            }

            // Output the word with its count (wrapped in an IntWritable)
            output.collect( key, new IntWritable( count ) );
        }
    }


    public int run(String[] args) throws Exception
    {
        // Create a configuration
        Configuration conf = getConf();

        // Create a job from the default configuration that will use the WordCount class
        JobConf job = new JobConf( conf, LogCountsPerHour.class );

        // Define our input path as the first command line argument and our output path as the second
        Path in = new Path( args[0] );
        Path out = new Path( args[1] );

        // Create File Input/Output formats for these paths (in the job)
        FileInputFormat.setInputPaths( job, in );
        FileOutputFormat.setOutputPath( job, out );

        // Configure the job: name, mapper, reducer, and combiner
        job.setJobName( "LogAveragePerHour" );
        job.setMapperClass( LogMapClass.class );
        job.setReducerClass( LogReduce.class );
        job.setCombinerClass( LogReduce.class );

        // Configure the output
        job.setOutputFormat( TextOutputFormat.class );
        job.setOutputKeyClass( DateWritable.class );
        job.setOutputValueClass( IntWritable.class );

        // Run the job
        JobClient.runJob(job);
        return 0;
    }

    public static void main(String[] args) throws Exception
    {
        // Start the LogCountsPerHour MapReduce application
        int res = ToolRunner.run( new Configuration(),
                new LogCountsPerHour(),
                args );
        System.exit( res );
    }
}

The LogCountsPerHour class looks similar to the WordCount class in the previous article, but with a few variances:

  • It defines a new mapper class called LogMapClass that emits DateWritable keys instead of Text keys.
  • Its reducer is nearly identical to our previous reducer, but instead of emitting Text keys and a count, it emits DateWritable keys and a count.
  • The run() method configures the class to run the appropriate mapper, reducer, and combiner as well as configures the output key (DateWritable) and output value (IntWritable).

The most interesting part of the LogCountsPerHour class is the mapper. In short, it parses an Apache Web Server log file line in the following format:

111.111.111.111 - - [16/Dec/2012:05:32:50 -0500] "GET /  HTTP/1.1" 200 14791 "-" "Mozilla/5.0 (compatible;  Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"

And from that it extracts the date:

16/Dec/2012:05:32:50 -0500

And from that it extracts the day, month, year, and hour of the request. This means that all requests between 5:00 and 5:59:59 will be grouped together as a date object for the specified day at 5am. This date will become the Key in our mapper, which means that when, for each record we output this hour and a count of 1, the combiners and reducers will ultimately compute the number of requests for that hour.

The output from running this MapReduce application is the following (I downloaded all of the log files from GeekCap.com, which has a rather poor attendance—please visit me):

2012-11-18 T 16:00:00.000       1
2012-11-18 T 17:00:00.000       21
2012-11-18 T 18:00:00.000       3
2012-11-18 T 19:00:00.000       4
2012-11-18 T 20:00:00.000       5
2012-11-18 T 21:00:00.000       21
...
2012-12-17 T 14:00:00.000       30
2012-12-17 T 15:00:00.000       60
2012-12-17 T 16:00:00.000       40
2012-12-17 T 17:00:00.000       20
2012-12-17 T 18:00:00.000       8
2012-12-17 T 19:00:00.000       31
2012-12-17 T 20:00:00.000       5
2012-12-17 T 21:00:00.000       21

This analysis shows that at 9pm on November 18 we saw 21 page views at 5pm, and on December 17 we saw 60 page views at 3pm. GeekCap.com is still pretty obscure, but your task (if you run this type of analysis of your production environment) is to look for patterns in daily usage and adapt your environment to react to this usage.

Listing 3 shows the contents of a Maven POM file that can be used to build this. A build can be performed with the following command:

mvn clean install

Listing 3. pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.geekcap</groupId>
  <artifactId>hadoop-examples</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>hadoop-examples</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>0.20.205.0</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
  </dependencies>
</project>

This is the same POM file that was used in the previous WordCount example: it defines hadoop-core as the required dependency to compile the code.

Summary

This three-part series began by reviewing the domain of problems that MapReduce, and specifically Hadoop, is proficient at solving as well as the architecture that affords Hadoop its power. It presented the basics of building a MapReduce application and running it in Hadoop. It concluded with a real-world MapReduce application that analyzed a web server’s log file and computed the number of page visits per hour.

The key to writing powerful MapReduce applications is to think in terms of mappers, combiners, and reducers. Here are some questions to ponder:

  • What exactly should your key look like?
  • What is the business value that you are trying to derive, and how can you group metrics together into keys?
  • What is the nature of the values you want to compute, and how can that be captured in your Value objects?

Combiners can greatly improve performance, but what conditions must you implement in order to use a combiner? For example, if your reduction operation is not associative then using it as a combiner might disrupt the response, but rewriting it in an associative manner, which means thinking about alternate means of writing the reducer, can have a profound impact on the performance of your application.

In the example presented in this article, the business value we wanted to derive was the number of page visits per hour, so naturally our key should be individual hours. The reason is that if we want to group page visits by hours and see the count on a per-hour basis, then we need to define a key type to match this grouping. When it comes to the value, we wanted a count of the number of pages views, so it made sense that we could use a simple counter. And because addition operations are associative, our reducer could be used as a combiner as well.

I recommended it in the previous article, but I’ll remind you again here: If you are looking for a good book to help you think in the MapReduce way, O’Reilly’s MapReduce Design Patterns is a great choice. I read through several books to help me get Hadoop setup and configured, but MapReduce Design Patterns was the first book that I found that helped me really understand how to approach MapReduce problems. I highly recommend it!

  • + Share This
  • 🔖 Save To Your Account

Discussions

comments powered by Disqus