Even though the Hadoop framework is written in Java, programs for Hadoop need not to be coded in Java but can also be
developed in other languages like Python or C++ (the latter since version 0.14.1). However,
Hadoop’s documentation and the most prominent
Python example on the Hadoop website could make you think that you
must translate your Python code using Jython into a Java jar file. Obviously, this is not
very convenient and can even be problematic if you depend on Python features not provided by Jython. Another issue of
the Jython approach is the overhead of writing your Python program in such a way that it can interact with Hadoop –
just have a look at the example in
$HADOOP_HOME/src/examples/python/WordCount.py and you see what I mean.
That said, the ground is now prepared for the purpose of this tutorial: writing a Hadoop MapReduce program in a more Pythonic way, i.e. in a way you should be familiar with.
What we want to do
Our program will mimick the WordCount, i.e. it reads text files and counts how often words occur. The input is text files and the output is text files, each line of which contains a word and the count of how often it occured, separated by a tab.
You should have an Hadoop cluster up and running because we will get our hands dirty. If you don’t have a cluster yet, my following tutorials might help you to build one. The tutorials are tailored to Ubuntu Linux but the information does also apply to other Linux/Unix variants.
- Running Hadoop On Ubuntu Linux (Single-Node Cluster) – How to set up a pseudo-distributed, single-node Hadoop cluster backed by the Hadoop Distributed File System (HDFS)
- Running Hadoop On Ubuntu Linux (Multi-Node Cluster) – How to set up a distributed, multi-node Hadoop cluster backed by the Hadoop Distributed File System (HDFS)
Python MapReduce Code
The “trick” behind the following Python code is that we will use the
Hadoop Streaming API (see also the corresponding
wiki entry) for helping us passing data between our Map and Reduce
STDIN (standard input) and
STDOUT (standard output). We will simply use Python’s
read input data and print our own output to
sys.stdout. That’s all we need to do because Hadoop Streaming will
take care of everything else!
Map step: mapper.py
Save the following code in the file
/home/hduser/mapper.py. It will read data from
STDIN, split it into words
and output a list of lines mapping words to their (intermediate) counts to
STDOUT. The Map script will not
compute an (intermediate) sum of a word’s occurrences though. Instead, it will output
<word> 1 tuples immediately
– even though a specific word might occur multiple times in the input. In our case we let the subsequent Reduce
step do the final sum count. Of course, you can change this behavior in your own scripts as you please, but we will
keep it like that in this tutorial because of didactic reasons. :-)
Make sure the file has execution permission (
chmod +x /home/hduser/mapper.py should do the trick) or you will run
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
Reduce step: reducer.py
Save the following code in the file
/home/hduser/reducer.py. It will read the results of
STDIN (so the output format of
mapper.py and the expected input format of
reducer.py must match) and sum the
occurrences of each word to a final count, and then output its results to
Make sure the file has execution permission (
chmod +x /home/hduser/reducer.py should do the trick) or you will run
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
Test your code (cat data | map | sort | reduce)
I recommend to test your
reducer.py scripts locally before using them in a MapReduce job.
Otherwise your jobs might successfully complete but there will be no job result data at all or not the results
you would have expected. If that happens, most likely it was you (or me) who screwed up.
Here are some ideas on how to test the functionality of the Map and Reduce scripts.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
Running the Python Code on Hadoop
Download example input data
We will use three ebooks from Project Gutenberg for this example:
- The Outline of Science, Vol. 1 (of 4) by J. Arthur Thomson
- The Notebooks of Leonardo Da Vinci
- Ulysses by James Joyce
Download each ebook as text files in
Plain Text UTF-8 encoding and store the files in a local temporary directory of
choice, for example
***Note from Bill: you will need to open a
browser in your Cloudera virtual machine. Select the appropriate file
to download (UTF-8 version), it will display in your browser. Hit a
right mouse button to save the file. Give it an appropriate name (like
"Ulysses") and note it will be saved in the directory Downloads. ***
1 2 3 4 5 6
Copy local example data to HDFS
*** Note from Bill:
We assume you are in your Downloads directory. We must create a subdirectory in the HDFS and then copy the files over. Lastly, we verify that the copy worked.
First, we create subdirectory MyFirst in the hdfs:
[cloudera@quickstart Downloads]$ hadoop fs -mkdir MyFirst
Next, we copy the files. Note, all three files have the .txt suffix :
[cloudera@quickstart Downloads]$ hadoop fs -copyFromLocal *.txt MyFirst
Finally, we verify that the copy worked correctly:
[cloudera@quickstart Downloads]$ hadoop fs -ls MyFirst
Found 3 items
-rw-r--r-- 1 cloudera cloudera 1423803 2014-11-30 08:02 MyFirst/Leonardo.txt
-rw-r--r-- 1 cloudera cloudera 674570 2014-11-30 08:02 MyFirst/OutlineOfScience.txt
-rw-r--r-- 1 cloudera cloudera 1573150 2014-11-30 08:02 MyFirst/Ulysses.txt
1 2 3 4 5 6 7 8 9 10
Run the MapReduce job
***Note from Bill:
To run the MapReduce job, type:
-file reducer.py -reducer reducer.py -input MyFirst/* -output MyFirst4-output
You will receive a warning about -file being deprecated, do not owrry about this. It is important that the output directory (MyFirst-output in this case) does not exist when you issue this command.
Verify that the program worked. First , type hadoop fs -ls MyFirst4-output
[cloudera@quickstart ~]$ hadoop fs -ls MyFirst4-output
Found 2 items
-rw-r--r-- 1 cloudera cloudera 0 2014-11-30 09:23 MyFirst4-output/_SUCCESS
-rw-r--r-- 1 cloudera cloudera 880829 2014-11-30 09:23 MyFirst4-output/part-00000
Next, look at the output file:
[cloudera@quickstart ~]$ hadoop fs -cat MyFirst4-output/part-00000
Copy the file from the HDFS to your local file system:
[cloudera@quickstart ~]$ hadoop fs -copyToLocal MyFirst4-output/part-00000 MyFirstOutputLocal.txt
Now that everything is prepared, we can finally run our Python MapReduce job on the Hadoop cluster. As I said above,
we leverage the Hadoop Streaming API for helping us passing data between our Map and Reduce code via
1 2 3 4
If you want to modify some Hadoop settings on the fly like increasing the number of Reduce tasks, you can use the
The job will read all the files in the HDFS directory
/user/hduser/gutenberg, process it, and store the results in
the HDFS directory
/user/hduser/gutenberg-output. In general Hadoop will create one output file per reducer; in
our case however it will only create a single file because the input files are very small.
Example output of the previous command in the console:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
As you can see in the output above, Hadoop also provides a basic web interface for statistics and information. When the Hadoop cluster is running, open http://localhost:50030/ in a browser and have a look around. Here’s a screenshot of the Hadoop web interface for the job we just ran.
Check if the result is successfully stored in HDFS directory
1 2 3 4
You can then inspect the contents of the file with the
dfs -cat command:
1 2 3 4 5 6 7 8 9 10 11 12
Note that in this specific output above the quote signs (
") enclosing the words have not been inserted by Hadoop.
They are the result of how our Python code splits words, and in this case it matched the beginning of a quote in the
ebook texts. Just inspect the
part-00000 file further to see it for yourself.
Improved Mapper and Reducer code: using Python iterators and generators
The Mapper and Reducer examples above should have given you an idea of how to create your first MapReduce application. The focus was code simplicity and ease of understanding, particularly for beginners of the Python programming language. In a real-world application however, you might want to optimize your code by using Python iterators and generators (an even better introduction in PDF).
Generally speaking, iterators and generators (functions that create iterators, for example with Python’s
statement) have the advantage that an element of a sequence is not produced until you actually need it. This can help
a lot in terms of computational expensiveness or memory consumption depending on the task at hand.
Precisely, we compute the sum of a word’s occurrences, e.g.
("foo", 4), only if by chance the same word (
appears multiple times in succession. In the majority of cases, however, we let the Hadoop group the (key, value) pairs
between the Map and the Reduce step because Hadoop is more efficient in this regard than our simple Python scripts.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28