Home > Articles > Programming > Java

Building a MapReduce Application with Hadoop

  • Print
  • + Share This
  • 💬 Discuss
As the amount of captured data increases over the years, so do our storage needs. Companies are realizing that “data is king,” but how do we analyze it? Through Hadoop. In the second article in this series, Java programming expert Steven Haines explains what a MapReduce application is and how to build a simple one.

The first article in this series described the domain of business problems that Hadoop was designed to solve, and the internal architecture of Hadoop that allows it to solve these problems. Applications that run in Hadoop are called MapReduce applications, so this article demonstrates how to build a simple MapReduce application.

Setting Up a Development Environment

Before you can use Hadoop, you’re going to need to have Java 6 (or later) installed, which can be downloaded for your platform from Oracle’s website. Additionally, if you are running on Windows, the official development and deployment platform upon which Hadoop runs is Linux, so you’re going to need to run Hadoop using Cygwin. Mac OSX users should have no problem running Hadoop natively.

Hadoop can be downloaded from its Releases page, but its numbering structure can be a little challenging to interpret. In short, the 1.x branch of code contains the current stable release, the 2.x.x branch contains the alpha code for version 2 of Hadoop, the 0.22.x branch of code is the 2.x.x code, but without security, and the 0.23.x branch of code excludes high availability. The 0.20.x branches of code are legacy and you should ignore them. For the examples in this article, I will be using the 0.23.x code branch, the latest of which is 0.23.5 as of this writing, but for production deployments, you would probably want to download version 1.x or 2.x.x.

Download and decompress this file on your local machine. If you’re planning on doing quite a bit of Hadoop development, it might be in your best interest to add the decompressed bin folder to your environment PATH. You can test your installation by executing the hadoop command from the bin folder:

bin/hadoop

Executing this command without any arguments reveals the following output:

Usage: hadoop [--config confdir] COMMAND
       where COMMAND is one of:
  fs                   run a generic filesystem user client
  version              print the version
  jar <jar>            run a jar file
  distcp <srcurl> <desturl> copy file or directories recursively
  archive -archiveName NAME -p <parent path> <src>* <dest> create a hadoop archive
  classpath            prints the class path needed to get the
                       Hadoop jar and the required libraries
  daemonlog            get/set the log level for each daemon
 or
  CLASSNAME            run the class named CLASSNAME

Most commands print help when invoked w/o parameters.

There are numerous commands that can be passed to Hadoop, but in this article we’ll be focusing on executing Hadoop applications in a development environment, so the only one we’ll be interested in is the following:

hadoop jar <jar-file-name>

Hello, MapReduce

The first program that you write in any programming language is typically a “Hello, World” application. In terms of Hadoop and MapReduce, the standard application that everyone writes is the Word Count application. The Word Count application counts the number of times each word in a large amount of text occurs. For example, the word “a” might appear 2,000, times whereas the word “hypothetical” might appear three times. It is a perfect example to learn about MapReduce because the mapping step and reducing step are trivial, but introduce you to thinking in MapReduce. The following is a summary of the components in the Word Count application and their function:

  •   FileInputFormat: We define a FileInputFormat to read all of the files in a specified directory (passed as the first argument to the MapReduce application) and pass those to a TextInputFormat (see Listing 1) for distribution to our mappers.
  •   TextInputFormat: The default InputFormat for Hadoop is the TextInputFormat, which reads one line at a time and returns the key as the byte offset as the key (LongWritable) and the line of text as the value (Text).
  •   Word Count Mapper: This is a class that we write which tokenizes the single line of text passed to it by the InputFormat into words and then emits the word itself with a count of “1” to note that we saw this word.
  •   Combiner: While we don’t need a combiner in a development environment, the combiner is an implementation of the reducer (described later in this article) that runs on the local node before passing the key/value pair to the reducer. Using combiners can dramatically improve performance, but you need to make sure that combining your results does not break your reducer: In order for the reducer to be used as a combiner, its operation must be associative, otherwise the maps sent to the reducer will not result in the correct result.
  •   Word Count Reducer: The word count reducer receives a map of every word and a list of all the counts for the number of times that the word was observed by the mappers. Without a combiner, the reducer would receive a word and a collection of “1”s, but because we are going to use the reducer as a combiner, we will have a collection of numbers that will need to be added together.
  •   TextOutputFormat: In this example, we use the TextOutputFormat class and tell it that the keys will be Text and the values will be IntWritable.
  •   FileOutputFormat: The TextOutputFormat sends its formatted output to a FileOutputFormat, which writes results to a self-created “output” directory.

You might be wondering why we’re calling Strings “Text” and numbers “IntWritable” and “LongWritable”. The reason is that in order for values to be passed across the Hadoop Distributed File System (HDFS) in a distributed fashion, there are specific rules that define serialization. Fortunately, Hadoop provides wrappers for common types, but if you need to develop your own, then it provides a Writable interface that you can implement it to do so.

Listing 1 shows the source code for our first MapReduce application.

Listing 1 WordCount.java

package com.geekcap.hadoopexamples;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

/**
 * Created by IntelliJ IDEA.
 * User: shaines
 * Date: 12/9/12
 * Time: 9:25 PM
 * To change this template use File | Settings | File Templates.
 */
public class WordCount extends Configured implements Tool {

    public static class MapClass extends MapReduceBase
            implements Mapper<LongWritable, Text, Text, IntWritable>
    {
        private Text word = new Text();
        private final static IntWritable one = new IntWritable( 1 );

        public void map( LongWritable key, // Offset into the file
                         Text value,
                         OutputCollector<Text, IntWritable> output,
                         Reporter reporter) throws IOException
        {
            // Get the value as a String
            String text = value.toString().toLowerCase();

            // Replace all non-characters
            text = text.replaceAll( "'", "" );
            text = text.replaceAll( "[^a-zA-Z]", " " );

            // Iterate over all of the words in the string
            StringTokenizer st = new StringTokenizer( text );
            while( st.hasMoreTokens() )
            {
                // Get the next token and set it as the text for our "word" variable
                word.set( st.nextToken() );

                // Output this word as the key and 1 as the value
                output.collect( word, one );
            }
        }
    }

    public static class Reduce extends MapReduceBase
            implements Reducer<Text, IntWritable, Text, IntWritable>
    {
        public void reduce( Text key, Iterator<IntWritable> values,
                            OutputCollector<Text, IntWritable> output,
                            Reporter reporter) throws IOException
        {
            // Iterate over all of the values (counts of occurrences of this word)
            int count = 0;
            while( values.hasNext() )
            {
                // Add the value to our count
                count += values.next().get();
            }

            // Output the word with its count (wrapped in an IntWritable)
            output.collect( key, new IntWritable( count ) );
        }
    }


    public int run(String[] args) throws Exception
    {
        // Create a configuration
        Configuration conf = getConf();

        // Create a job from the default configuration that will use the WordCount class
        JobConf job = new JobConf( conf, WordCount.class );

        // Define our input path as the first command line argument and our output path as the second
        Path in = new Path( args[0] );
        Path out = new Path( args[1] );

        // Create File Input/Output formats for these paths (in the job)
        FileInputFormat.setInputPaths( job, in );
        FileOutputFormat.setOutputPath( job, out );

        // Configure the job: name, mapper, reducer, and combiner
        job.setJobName( "WordCount" );
        job.setMapperClass( MapClass.class );
        job.setReducerClass( Reduce.class );
        job.setCombinerClass( Reduce.class );

        // Configure the output
        job.setOutputFormat( TextOutputFormat.class );
        job.setOutputKeyClass( Text.class );
        job.setOutputValueClass( IntWritable.class );

        // Run the job
        JobClient.runJob(job);
        return 0;
    }

    public static void main(String[] args) throws Exception
    {
        // Start the WordCount MapReduce application
        int res = ToolRunner.run( new Configuration(),
                new WordCount(),
                args );
        System.exit( res );
    }
}

Code Analysis

Execution starts as the WordCount’s main() method is executed, which uses the ToolRunner class to run the job. The ToolRunner creates the WordCount class and executes its run() method.

The run() method configures the job by defining input and output paths and then creating FileInputFormat and FileOutputFormat objects that reference those paths. Setting the input and output formats is a little different from the remainder of the configuration because we create their instances and pass them the reference to the job. The other configuration is accomplished by invoking one of the job’s setter methods.

The job is then configured with a mapper class, a reducer class, and a combiner class. Note that we pass the class itself and not an instance of the class so that Hadoop can create as many of them as it needs to across its distributed environment.

The real work takes place in the MapClass and the Reduce class. The MapClass receives the following information:

  • key: The byte offset into the file.
  • value: The text of a single line of the file.
  • output: The OutputCollector is the mechanism through which we output the key/value pair that we want to pass to the reducer.
  • reporter: Used to report progress in processing the job back to the Hadoop server. It is not used in this example

The MapClass extracts the value to a String by calling the value’s toString() method and then does a few conversions: It converts the String to lowercase so that we can match words like “Apple” with “apple”, it deletes single quotes, and it replaces all non-characters with spaces. It then tokenizes the String using white space, and then iterates over all of the tokens in the String. For each token that it finds, it sets the word variable’s text to the token and then emits the word as the key and a static IntWrapper for the number 1 as the value. We could have created a new Text word each time, but because of the number of times this is going to run, it improves performance to maintain the word as a member variable and not re-create it each time.

The Reduce class’ reduce() method receives the same set of parameters that the map() method receives, only its key is the word and, instead of receiving a single value, it receives an Iterator to a list of values. In this example, it would receive something like the word “apple” and an Iterator to a collection with values 1, 1, 1, 1. But because we want to also be able to use this Reduce class as a combiner, we don’t just count the number of entries, but instead of extract the value by calling the IntWritable’s get() method and adding it to our sum. In the end, the reduce() method returns the same key it received (the word) and the sum of the number of occurrences.

You might be wondering what the big deal is. This is a simple program, right? And you are right, it is a simple program, and that is the elegance with MapReduce: You will find yourself spending more time deriving your solution that actually coding.

Listing 2 shows a Maven POM file for building this source code.

Listing 2 pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.geekcap</groupId>
  <artifactId>hadoop-examples</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>hadoop-examples</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>0.20.205.0</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
  </dependencies>

</project>

The POM file is very simple and only includes a reference to the hadoop-core dependency. You can build with the following command:

mvn clean install

To put this all together, we need a significant text file for which to count words. A great source of large text files is the Project Gutenberg, which includes more than 100,000 free ebooks. For my example, I chose Moby Dick. Download one of the text files and put it in a directory on your hard drive (and it should be the only file in that directory on your hard drive). Once you have it, then you can execute your MapReduce project by executing the hadoop command, passing it the path to the directory that contains your book, and a destination directory. For example:

hadoop jar hadoop-examples-1.0-SNAPSHOT.jar com.geekcap.hadoopexamples.WordCount  ~/apps/hadoop-0.23.5/test-data output

When I execute this I see the following output:

2012-12-11 22:27:08.929 java[37044:1203] Unable to load realm info from SCDynamicStore
2012-12-11 22:27:09.023 java[37044:1203] Unable to load realm info from SCDynamicStore
12/12/11 22:27:09 WARN conf.Configuration: session.id is deprecated. Instead, use dfs.metrics.session-id
12/12/11 22:27:09 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
12/12/11 22:27:09 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
12/12/11 22:27:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
12/12/11 22:27:09 WARN snappy.LoadSnappy: Snappy native library not loaded
12/12/11 22:27:09 INFO mapred.FileInputFormat: Total input paths to process : 1
12/12/11 22:27:10 INFO mapreduce.JobSubmitter: number of splits:1
12/12/11 22:27:10 WARN conf.Configuration: mapred.jar is deprecated. Instead, use mapreduce.job.jar
12/12/11 22:27:10 WARN conf.Configuration: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class
12/12/11 22:27:10 WARN conf.Configuration: mapred.job.name is deprecated. Instead, use mapreduce.job.name
12/12/11 22:27:10 WARN conf.Configuration: mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
12/12/11 22:27:10 WARN conf.Configuration: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
12/12/11 22:27:10 WARN conf.Configuration: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
12/12/11 22:27:10 WARN conf.Configuration: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class
12/12/11 22:27:10 WARN conf.Configuration: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir
12/12/11 22:27:10 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local_0001
12/12/11 22:27:10 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
12/12/11 22:27:10 INFO mapred.LocalJobRunner: OutputCommitter set in config null
12/12/11 22:27:10 INFO mapreduce.Job: Running job: job_local_0001
12/12/11 22:27:10 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter
12/12/11 22:27:10 INFO mapred.LocalJobRunner: Waiting for map tasks
12/12/11 22:27:10 INFO mapred.LocalJobRunner: Starting task: attempt_local_0001_m_000000_0
12/12/11 22:27:10 INFO mapred.Task:  Using ResourceCalculatorPlugin : null
12/12/11 22:27:10 INFO mapred.MapTask: numReduceTasks: 1
12/12/11 22:27:10 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
12/12/11 22:27:10 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
12/12/11 22:27:10 INFO mapred.MapTask: soft limit at 83886080
12/12/11 22:27:10 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
12/12/11 22:27:10 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
12/12/11 22:27:11 INFO mapred.LocalJobRunner: 
12/12/11 22:27:11 INFO mapred.MapTask: Starting flush of map output
12/12/11 22:27:11 INFO mapred.MapTask: Spilling map output
12/12/11 22:27:11 INFO mapred.MapTask: bufstart = 0; bufend = 2027118; bufvoid = 104857600
12/12/11 22:27:11 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 25353164(101412656); length = 861233/6553600
12/12/11 22:27:11 INFO mapreduce.Job: Job job_local_0001 running in uber mode : false
12/12/11 22:27:11 INFO mapreduce.Job:  map 0% reduce 0%
12/12/11 22:27:12 INFO mapred.MapTask: Finished spill 0
12/12/11 22:27:12 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of committing
12/12/11 22:27:12 INFO mapred.LocalJobRunner: file:/Users/shaines/apps/hadoop-0.23.5/test-data/mobydick.txt:0+1212132
12/12/11 22:27:12 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
12/12/11 22:27:12 INFO mapred.LocalJobRunner: Finishing task: attempt_local_0001_m_000000_0
12/12/11 22:27:12 INFO mapred.LocalJobRunner: Map task executor complete.
12/12/11 22:27:12 INFO mapred.Task:  Using ResourceCalculatorPlugin : null
12/12/11 22:27:12 INFO mapred.Merger: Merging 1 sorted segments
12/12/11 22:27:12 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 247166 bytes
12/12/11 22:27:12 INFO mapred.LocalJobRunner: 
12/12/11 22:27:12 INFO mapreduce.Job:  map 100% reduce 0%
12/12/11 22:27:12 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of committing
12/12/11 22:27:12 INFO mapred.LocalJobRunner: 
12/12/11 22:27:12 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
12/12/11 22:27:12 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to file:/Users/shaines/Documents/Workspace/hadoop-examples/target/output/_temporary/0/task_local_0001_r_000000
12/12/11 22:27:12 INFO mapred.LocalJobRunner: reduce > reduce
12/12/11 22:27:12 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
12/12/11 22:27:13 INFO mapreduce.Job:  map 100% reduce 100%
12/12/11 22:27:13 INFO mapreduce.Job: Job job_local_0001 completed successfully
12/12/11 22:27:13 INFO mapreduce.Job: Counters: 24
	File System Counters
		FILE: Number of bytes read=2683488
		FILE: Number of bytes written=974132
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
	Map-Reduce Framework
		Map input records=21573
		Map output records=215309
		Map output bytes=2027118
		Map output materialized bytes=247174
		Input split bytes=113
		Combine input records=215309
		Combine output records=17107
		Reduce input groups=17107
		Reduce shuffle bytes=0
		Reduce input records=17107
		Reduce output records=17107
		Spilled Records=34214
		Shuffled Maps =0
		Failed Shuffles=0
		Merged Map outputs=0
		GC time elapsed (ms)=32
		Total committed heap usage (bytes)=264110080
	File Input Format Counters 
		Bytes Read=1212132
	File Output Format Counters 
		Bytes Written=182624

And because I told it output to an “output” directory, my output directory has a file named part-00000. Here is a small snippet of its contents:

a       4687
aback   2
abaft   2
abandon 3
abandoned       7
abandonedly     1
abandonment     2
...
your    251
youre   6
youve   1
zephyr  1
zeuglodon       1
zones   3
zoology 2
zoroaster       1

The output contains the word that it found and the number of occurrences of that word. The word “a” occurred 4687 times in Moby Dick, whereas the word “your” only occurred 251 times.

Summary

This article demonstrated how to create a simple MapReduce application from start to finish. It delved into the depths of MapReduce to describe how mappers and reducers are built and then how Hadoop is configured to execute the mapper, reducer, and combiner. The important thing to realize about Hadoop and MapReduce in general is that you’ll need to spend more time thinking about how to solve your problem than you will coding. The trick is to think about the type of key that you need and how to properly construct your value. It takes time and practice, but it is a powerful tool at your disposal.

Don't miss the final article in this series, Applied Big Data Analysis in the Real World with MapReduce and Hadoop, to be posted next week. This article will walk you through setting up and managing a Hadoop production environment.

If you are looking for a good book to help you think in MapReduce, O’Reilly’s MapReduce Design Patterns is a great choice. I read through several books to help me get Hadoop set up and configured, but MapReduce Design Patterns was the first book that I found that helped me really understand how to approach MapReduce problems. I highly recommend it!

  • + Share This
  • 🔖 Save To Your Account

Discussions

comments powered by Disqus