1. Introduction

One of the unappetizing aspects of Hadoop to users of traditional HPC is that it is written in Java. Java is not designed to be a high-performance language and, although I can only definitively speak for myself, I suspect that learning it is not a high priority for domain scientists. As it turns out though, Hadoop allows you to write map/reduce code in any language you want using the Hadoop Streaming interface. This is a key feature in making Hadoop more palatable for the scientific community, as it means turning an existing Python or Perl script into a Hadoop job does not require learning Java or derived Hadoop-centric languages like Pig. The following guide illustrates how you can run Hadoop jobs, written entirely in Python, without ever having to mess with any other languages on XSEDE's Gordon resource at SDSC.

Once the basics of running Python-based Hadoop jobs are covered, I will illustrate a more practical example: using Hadoop to parse a variant call format (VCF) file using a VCF parsing library you would install without root privileges on your supercomputing account.

The wordcount example here is on my GitHub account. I've also got implementations of this in Perl and R posted there.

2. Review of Hadoop on HPC

This guide assumes you are familiar with Hadoop and map/reduce at a conceptual level. You should be in good shape after reading the Conceptual Overview of Map/Reduce and Hadoop page I have.

This guide also assumes you understand the basics of running a Hadoop cluster on an HPC resource (supercomputer). This can mean

For the purposes of keeping the process of specifically running Hadoop Streaming clear, I will assume we have a Hadoop cluster already spun up on Gordon using my semi-persistent Hadoop cluster script and only provide the relevant hadoop commands involved in submitting the Hadoop job to this Hadoop cluster. If you want to go the non-interactive route, I have a submit script on GitHub that wraps this example problem into a single non-interactive Gordon job.

3. The Canonical Wordcount Example

Counting the number of words in a large document is the "hello world" of map/reduce, and it is among the simplest of full map+reduce Hadoop jobs you can run. Recalling that the map step transforms the raw input data into key/value pairs and the reduce step transforms the key/value pairs into your desired output, we can conceptually describe the act of counting words as

  • the map step will take the raw text of our document (I will use Herman Melville's classic, Moby Dick) and convert it to key/value pairs. Each key is a word, and all keys (words) will have a value of 1.
  • the reduce step will combine all duplicate keys by adding up their values. Since every key (word) has a value of 1, this will reduce our output to a list of unique keys, each with a value corresponding to that key's (word's) count.
Schematic of wordcount in the context of map-reduce

With Hadoop Streaming, we need to write a program that acts as the mapper and a program that acts as the reducer. These applications must interface with input/output streams in such a way equivalent to the following series of pipes:

$ cat input.txt | ./mapper.py | sort | ./reducer.py > output.txt

That is, Hadoop Streaming sends raw data to the mapper via stdin, then sends the mapped key/value pairs to the reducer via stdin.

3.1. The Mapper

The mapper, as described above, is quite simple to implement in Python. It will look something like this:

#!/usr/bin/env python

import sys

for line in sys.stdin:
    line = line.strip()
    keys = line.split()
    for key in keys:
        value = 1
        print( "%s\t%d" % (key, value) )

where

  1. Hadoop sends a line of text from the input file ("line" being defined by a string of text terminated by a linefeed character, \n)
  2. Python strips all leading/trailing whitespace (line.strip()
  3. Python splits that line into a list of individual words along whitespace (line.split())
  4. For each word (which will become a key), we assign a value of 1 and then print the key-value pair on a single line, separated by a tab (\t)

A more detailed explanation of this process can be found in Yahoo's excellent Hadoop Tutorial.

3.2. The Shuffle

A lot happens between the map and reduce steps that is largely transparent to the developer. In brief, the output of the mappers is transformed and distributed to the reducers (termed the shuffle step) in such a way that

  1. All key/value pairs are sorted before being presented to the reducer function
  2. All key/value pairs sharing the same key are sent to the same reducer

These two points are important because

  1. As you read in key/value pairs, if you encounter a key that is different from the last key you processed, you know that that previous key will never appear again
  2. If your keys are all the same, you will only use one reducer and gain no parallelization. You should come up with a more unique key if this happens!

3.3. The Reducer

The output from our mapper step will go to the reducer step sorted. Thus, we can loop over all input key/pairs and apply the following logic:

If this key is the same as the previous key,
   add this key's value to our running total.
Otherwise,
    print out the previous key's name and the running total,
    reset our running total to 0,
    add this key's value to the running total, and
    "this key" is now considered the "previous key"

Translating this into Python and adding a little extra code to tighten up the logic, we get

#!/usr/bin/env python

import sys

last_key = None
running_total = 0

for input_line in sys.stdin:
   input_line = input_line.strip()
   this_key, value = input_line.split("\t", 1)
   value = int(value)

   if last_key == this_key:
       running_total += value
   else:
       if last_key:
           print( "%s\t%d" % (last_key, running_total) )
       running_total = value
       last_key = this_key

if last_key == this_key:
   print( "%s\t%d" % (last_key, running_total) )

3.4. Running the Hadoop Job

If we name the mapper script mapper.py and the reducing script reducer.py, we would first want to download the input data (Moby Dick) and load it into HDFS. I purposely am renaming the copy stored in HDFS to mobydick.txt instead of the original pg2701.txt to highlight the location of the file:

$ wget http://www.gutenberg.org/cache/epub/2701/pg2701.txt
$ hadoop dfs -mkdir wordcount
$ hadoop dfs -copyFromLocal ./pg2701.txt wordcount/mobydick.txt

You can verify that the file was loaded properly:

$ hadoop dfs -ls wordcount/mobydick.txt
Found 1 items
-rw-r--r--   2 glock supergroup    1257260 2013-07-17 13:24 /user/glock/wordcount/mobydick.txt

Before submitting the Hadoop job, you should make sure your mapper and reducer scripts actually work. This is just a matter of running them through pipes on a little bit of sample data (e.g., the first 1000 lines of Moby Dick):

$ head -n1000 pg2701.txt | ./mapper.py | sort | ./reducer.py
...
young   4
your    16
yourself    3
zephyr  1

Once you know the mapper/reducer scripts work without errors, we can plug them into Hadoop. We accomplish this by running the Hadoop Streaming jar file as our Hadoop job. This hadoop-streaming-X.Y.Z.jar file comes with the standard Apache Hadoop distribution and should be in $HADOOP_HOME/contrib/streaming where $HADOOP_HOME is the base directory of your Hadoop installation and X.Y.Z is the version of Hadoop you are running. On Gordon the location is /opt/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar, so our actual job launch command would look like

$ hadoop \
    jar /opt/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar \
    -mapper "python $PWD/mapper.py" \
    -reducer "python $PWD/reducer.py" \
    -input "wordcount/mobydick.txt" \
    -output "wordcount/output"

packageJobJar: [/scratch/glock/819550.gordon-fe2.local/hadoop-glock/data/hadoop-unjar4721749961014550860/] [] /tmp/streamjob7385577774459124859.jar tmpDir=null
13/07/17 19:26:16 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/07/17 19:26:16 WARN snappy.LoadSnappy: Snappy native library not loaded
13/07/17 19:26:16 INFO mapred.FileInputFormat: Total input paths to process : 1
13/07/17 19:26:16 INFO streaming.StreamJob: getLocalDirs(): [/scratch/glock/819550.gordon-fe2.local/hadoop-glock/data/mapred/local]
13/07/17 19:26:16 INFO streaming.StreamJob: Running job: job_201307171926_0001
13/07/17 19:26:16 INFO streaming.StreamJob: To kill this job, run:
13/07/17 19:26:16 INFO streaming.StreamJob: /opt/hadoop/libexec/../bin/hadoop job  -Dmapred.job.tracker=gcn-13-34.ibnet0:54311 -kill job_201307171926_0001
13/07/17 19:26:16 INFO streaming.StreamJob: Tracking URL: http://gcn-13-34.ibnet0:50030/jobdetails.jsp?jobid=job_201307171926_0001

And at this point, the job is running. That "tracking URL" is a bit deceptive in that you probably won't be able to access it. Fortunately, there is a command-line interface for monitoring Hadoop jobs that is somewhat similar to qstat. Noting the Hadoop jobid (highlighted above), you can do:

$ hadoop -status job_201307171926_0001
Job: job_201307171926_0001
file: hdfs://gcn-13-34.ibnet0:54310/scratch/glock/819550.gordon-fe2.local/hadoop-glock/data/mapred/staging/glock/.staging/job_201307171926_0001/job.xml
tracking URL: http://gcn-13-34.ibnet0:50030/jobdetails.jsp?jobid=job_201307171926_0001
map() completion: 1.0 
reduce() completion: 1.0

Counters: 30
    Job Counters
        Launched reduce tasks=1
        SLOTS_MILLIS_MAPS=16037
        Total time spent by all reduces waiting after reserving slots (ms)=0
        Total time spent by all maps waiting after reserving slots (ms)=0
        Launched map tasks=2
        Data-local map tasks=2
...

Since the hadoop streaming job runs in the foreground, you will have to use another terminal (with HADOOP_CONF_DIR properly exported) to check on the job while it runs. However, you can also review the job metrics after the job has finished. In the example highlighted above, we can see that the job only used one reduce task and two map tasks despite the cluster having more than two nodes.

3.5. Adjusting Parallelism

Unlike with a traditional HPC job, the level of parallelism a Hadoop job is not necessarily the full size of your compute resource. The number of map tasks is ultimately determined by the nature of your input data due to how HDFS distributes chunks of data to your mappers. You can "suggest" a number of mappers when you submit the job though. Doing so is a matter of applying the change highlighted:

$ hadoop jar /opt/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar \
    -D mapred.map.tasks=4 \
    -mapper "python $PWD/mapper.py" \
    -reducer "python $PWD/reducer.py" \
    -input wordcount/mobydick.txt \
    -output wordcount/output
...
$ hadoop -status job_201307172000_0001
...
    Job Counters
        Launched reduce tasks=1
        SLOTS_MILLIS_MAPS=24049 
        Total time spent by all reduces waiting after reserving slots (ms)=0
        Total time spent by all maps waiting after reserving slots (ms)=0
        Rack-local map tasks=1
        Launched map tasks=4
        Data-local map tasks=3

Similarly, you can add -D mapred.reduce.tasks=4 to suggest the number of reducers. The reducer count is a little more flexible, and you can set it to zero if you just want to apply a mapper to a large file.

With all this being said, the fact that Hadoop defaults to only two mappers says something about the problem we're trying to solve--that is, this entire example is actually very stupid. While it illustrates the concepts quite neatly, counting words in a 1.2 MB file is a waste of time if done through Hadoop because, by default, Hadoop assigns chunks to mappers in increments of 64 MB. Hadoop is meant to handle multi-gigabyte files, and actually getting Hadoop streaming to do something useful for your research often requires a bit more knowledge than what I've presented above.

To fill in these gaps, the next part of this tutorial, Parsing VCF Files with Hadoop Streaming, shows how I applied Hadoop to solve a real-world problem involving Python, some exotic Python libraries, and some not-completely-uniform files.


This document was developed with support from the National Science Foundation (NSF) under Grant No. 0910812 to Indiana University for "FutureGrid: An Experimental, High-Performance Grid Test-bed." Any opinions, findings, and conclusions or recommendations expressed in this material are those of the author(s) and do not necessarily reflect the views of the NSF.