Writing an Hadoop MapReduce Program in Python

In this tutorial I will describe how to write a simple MapReduce program for Hadoop in the Python programming language.

Running the Python Code on Hadoop

We will use three ebooks from Project Gutenberg for this example:

Download each ebook as text files in Plain Text UTF-8 encoding and store the files in a local temporary directory of choice, for example /tmp/gutenberg.

***Note from Bill: you will need to open a browser in your Cloudera virtual machine. Select the appropriate file to download (UTF-8 version), it will display in your browser. Hit a right mouse button to save the file. Give it an appropriate name (like "Ulysses") and note it will be saved in the directory Downloads. ***

 1 2 3 4 5 6 hduser@ubuntu:~$ls -l /tmp/gutenberg/total 3604-rw-r--r-- 1 hduser hadoop 674566 Feb 3 10:17 pg20417.txt-rw-r--r-- 1 hduser hadoop 1573112 Feb 3 10:18 pg4300.txt-rw-r--r-- 1 hduser hadoop 1423801 Feb 3 10:18 pg5000.txthduser@ubuntu:~$

Copy local example data to HDFS

Before we run the actual MapReduce job, we must first copy the files from our local file system to Hadoop’s HDFS.

*** Note from Bill:
We assume you are in your Downloads directory. We must create a subdirectory in the HDFS and then copy the files over. Lastly, we verify that the copy worked.
First, we create subdirectory MyFirst in the hdfs:

[cloudera@quickstart Downloads]$hadoop fs -mkdir MyFirst Next, we copy the files. Note, all three files have the .txt suffix : [cloudera@quickstart Downloads]$ hadoop fs -copyFromLocal *.txt MyFirst

Finally, we verify that the copy worked correctly:

[cloudera@quickstart Downloads]$hadoop fs -ls MyFirst Found 3 items -rw-r--r-- 1 cloudera cloudera 1423803 2014-11-30 08:02 MyFirst/Leonardo.txt -rw-r--r-- 1 cloudera cloudera 674570 2014-11-30 08:02 MyFirst/OutlineOfScience.txt -rw-r--r-- 1 cloudera cloudera 1573150 2014-11-30 08:02 MyFirst/Ulysses.txt  1 2 3 4 5 6 7 8 9 10 hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -copyFromLocal /tmp/gutenberg /user/hduser/gutenberghduser@ubuntu:/usr/local/hadoop$bin/hadoop dfs -lsFound 1 itemsdrwxr-xr-x - hduser supergroup 0 2010-05-08 17:40 /user/hduser/gutenberghduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser/gutenbergFound 3 items-rw-r--r-- 3 hduser supergroup 674566 2011-03-10 11:38 /user/hduser/gutenberg/pg20417.txt-rw-r--r-- 3 hduser supergroup 1573112 2011-03-10 11:38 /user/hduser/gutenberg/pg4300.txt-rw-r--r-- 3 hduser supergroup 1423801 2011-03-10 11:38 /user/hduser/gutenberg/pg5000.txthduser@ubuntu:/usr/local/hadoop$Run the MapReduce job ***Note from Bill: To run the MapReduce job, type: [cloudera@quickstart ~]$ hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar  -file mapper.py    -mapper mapper.py \
-file reducer.py   -reducer reducer.py -input MyFirst/* -output MyFirst4-output

You will receive a warning about -file being deprecated, do not owrry about this. It is important that the output directory (MyFirst-output in this case) does not exist when you issue this command.

Verify that the program worked. First , type hadoop fs -ls MyFirst4-output

[cloudera@quickstart ~]$hadoop fs -ls MyFirst4-output Found 2 items -rw-r--r-- 1 cloudera cloudera 0 2014-11-30 09:23 MyFirst4-output/_SUCCESS -rw-r--r-- 1 cloudera cloudera 880829 2014-11-30 09:23 MyFirst4-output/part-00000 Next, look at the output file: [cloudera@quickstart ~]$ hadoop fs -cat MyFirst4-output/part-00000

Copy the file from the HDFS to your local file system:
[cloudera@quickstart ~]$hadoop fs -copyToLocal MyFirst4-output/part-00000 MyFirstOutputLocal.txt Now that everything is prepared, we can finally run our Python MapReduce job on the Hadoop cluster. As I said above, we leverage the Hadoop Streaming API for helping us passing data between our Map and Reduce code via STDIN and STDOUT.  1 2 3 4 hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar \ -file /home/hduser/mapper.py -mapper /home/hduser/mapper.py \ -file /home/hduser/reducer.py -reducer /home/hduser/reducer.py \ -input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output

If you want to modify some Hadoop settings on the fly like increasing the number of Reduce tasks, you can use the -D option:

 1 hduser@ubuntu:/usr/local/hadoop$bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -D mapred.reduce.tasks=16 ... Note about mapred.map.tasksHadoop does not honor mapred.map.tasks beyond considering it a hint. But it accepts the user specified mapred.reduce.tasks and doesn’t manipulate that. You cannot force mapred.map.tasks but can specify mapred.reduce.tasks. The job will read all the files in the HDFS directory /user/hduser/gutenberg, process it, and store the results in the HDFS directory /user/hduser/gutenberg-output. In general Hadoop will create one output file per reducer; in our case however it will only create a single file because the input files are very small. Example output of the previous command in the console:  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -mapper /home/hduser/mapper.py -reducer /home/hduser/reducer.py -input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output additionalConfSpec_:null null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming packageJobJar: [/app/hadoop/tmp/hadoop-unjar54543/] [] /tmp/streamjob54544.jar tmpDir=null [...] INFO mapred.FileInputFormat: Total input paths to process : 7 [...] INFO streaming.StreamJob: getLocalDirs(): [/app/hadoop/tmp/mapred/local] [...] INFO streaming.StreamJob: Running job: job_200803031615_0021 [...] [...] INFO streaming.StreamJob: map 0% reduce 0% [...] INFO streaming.StreamJob: map 43% reduce 0% [...] INFO streaming.StreamJob: map 86% reduce 0% [...] INFO streaming.StreamJob: map 100% reduce 0% [...] INFO streaming.StreamJob: map 100% reduce 33% [...] INFO streaming.StreamJob: map 100% reduce 70% [...] INFO streaming.StreamJob: map 100% reduce 77% [...] INFO streaming.StreamJob: map 100% reduce 100% [...] INFO streaming.StreamJob: Job complete: job_200803031615_0021 [...] INFO streaming.StreamJob: Output: /user/hduser/gutenberg-outputhduser@ubuntu:/usr/local/hadoop$As you can see in the output above, Hadoop also provides a basic web interface for statistics and information. When the Hadoop cluster is running, open http://localhost:50030/ in a browser and have a look around. Here’s a screenshot of the Hadoop web interface for the job we just ran. Figure 1: A screenshot of Hadoop’s JobTracker web interface, showing the details of the MapReduce job we just ran Check if the result is successfully stored in HDFS directory /user/hduser/gutenberg-output:  1 2 3 4 hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser/gutenberg-outputFound 1 items/user/hduser/gutenberg-output/part-00000 <r 1> 903193 2007-09-21 13:00hduser@ubuntu:/usr/local/hadoop$You can then inspect the contents of the file with the dfs -cat command:  1 2 3 4 5 6 7 8 9 10 11 12 hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -cat /user/hduser/gutenberg-output/part-00000"(Lo)cra" 1"1490 1 "1498," 1 "35" 1 "40," 1 "A 2"AS-IS". 2"A_ 1 "Absoluti 1[...] hduser@ubuntu:/usr/local/hadoop\$

Note that in this specific output above the quote signs (") enclosing the words have not been inserted by Hadoop. They are the result of how our Python code splits words, and in this case it matched the beginning of a quote in the ebook texts. Just inspect the part-00000 file further to see it for yourself.

Improved Mapper and Reducer code: using Python iterators and generators

The Mapper and Reducer examples above should have given you an idea of how to create your first MapReduce application. The focus was code simplicity and ease of understanding, particularly for beginners of the Python programming language. In a real-world application however, you might want to optimize your code by using Python iterators and generators (an even better introduction in PDF).

Generally speaking, iterators and generators (functions that create iterators, for example with Python’s yield statement) have the advantage that an element of a sequence is not produced until you actually need it. This can help a lot in terms of computational expensiveness or memory consumption depending on the task at hand.

Note: The following Map and Reduce scripts will only work “correctly” when being run in the Hadoop context, i.e. as Mapper and Reducer in a MapReduce job. This means that running the naive test command “cat DATA | ./mapper.py | sort -k1,1 | ./reducer.py” will not work correctly anymore because some functionality is intentionally outsourced to Hadoop.

Precisely, we compute the sum of a word’s occurrences, e.g. ("foo", 4), only if by chance the same word (foo) appears multiple times in succession. In the majority of cases, however, we let the Hadoop group the (key, value) pairs between the Map and the Reduce step because Hadoop is more efficient in this regard than our simple Python scripts.

mapper.py

 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 #!/usr/bin/env python """A more advanced Mapper, using Python iterators and generators.""" import sys def read_input(file): for line in file: # split the line into words yield line.split() def main(separator='\t'): # input comes from STDIN (standard input) data = read_input(sys.stdin) for words in data: # write the results to STDOUT (standard output); # what we output here will be the input for the # Reduce step, i.e. the input for reducer.py # # tab-delimited; the trivial word count is 1 for word in words: print '%s%s%d' % (word, separator, 1) if __name__ == "__main__": main()

reducer.py

 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 #!/usr/bin/env python """A more advanced Reducer, using Python iterators and generators.""" from itertools import groupby from operator import itemgetter import sys def read_mapper_output(file, separator='\t'): for line in file: yield line.rstrip().split(separator, 1) def main(separator='\t'): # input comes from STDIN (standard input) data = read_mapper_output(sys.stdin, separator=separator) # groupby groups multiple word-count pairs by word, # and creates an iterator that returns consecutive keys and their group: # current_word - string containing a word (the key) # group - iterator yielding all ["<current_word>", "<count>"] items for current_word, group in groupby(data, itemgetter(0)): try: total_count = sum(int(count) for current_word, count in group) print "%s%s%d" % (current_word, separator, total_count) except ValueError: # count was not a number, so silently discard this item pass if __name__ == "__main__": main()

I am a researcher and software engineer based in Switzerland, Europe. I work for the .COM and .NET DNS registry operator Verisign as the technical lead of its large-scale computing infrastructure based on the Apache Hadoop stack and as a research affiliate at Verisign Labs. Read more »

Contact

michael@michael-noll.com