CHAPTER 5
Although MapReduce and Hadoop itself are native Java platforms, there is support for building MapReduce components in other languages. This is called Hadoop Streaming, and it is a simple approach in which Hadoop invokes an executable as part of a task rather than hosting a JAR file in a JVM.
Hadoop Streaming uses the standard input and output streams to communicate with the executing process, so it is suitable for any platform that can build an executable binary that reads from stdin and writes to stdout. Although simple, Hadoop Streaming is a powerful technique that greatly extends the reach and flexibility of MapReduce.
Streaming allows you to use MapReduce without having to write Java code for mappers and reducers. This is particularly attractive for users who already have an investment in non-Java platforms. If you have a library of custom analytical code written in Python, or if you want to write MapReduce code in .NET, you can do that with Hadoop Streaming. You can even mix and match, using a Java mapper with a C++ reducer or an R mapper with a Python reducer.
A streaming job is a MapReduce job, but with a different execution mode for the job tasks. It is still submitted to YARN from a client and still follows the YARN architecture with an application master starting to manage the job. Hadoop ships with a standard JAR as the driver program, and we pass the hadoop command the details of the executables to which we want to stream.
When task containers run, a Java VM spawns the executable process. Communication back to the application master is done in the Java VM, which acts as a bridge between Hadoop and the streaming executable.
The hadoop-streaming.jar takes four arguments (as a minimum; the job can be configured with more arguments), specifying the input and output directories and the executables to run for the mapper and reducer. Code Listing 20 shows a valid Hadoop Streaming job submission that uses standard Linux shell commands for the mapper and reducer.
Code Listing 20: Hadoop Streaming Using System Commands
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \ -input input \ -output output4 \ -mapper /bin/cat \ -reducer /usr/bin/wc |
This command runs a MapReduce program with the cat command as the mapper, which will concatenate all the input lines into one output, and with the wc command as the reducer, which runs a word count over the intermediate output.
Using MapReduce for this means all the input files will be combined by the mappers and presented to the reducer as a single input. The reducer then runs the count over the combined file. Code Listing 21 shows the output from the streaming job.
Code Listing 21: Output of the Streaming Job
# hadoop fs -cat output4/* 2022 7600 76858 |
The wc command writes output in a fixed order, showing the line count, word count, and total byte count for the input, and here we have 2,022 lines containing 7,600 words in 75KB of storage. This is a trivial example, but it shows the power of streaming—here we have some real analysis generated by Hadoop and we did not have to write any code at all.
Hadoop Streaming uses a very simple interface to communicate with executable mappers and reducers. Remember that all data is transferred through Hadoop as key-value pairs, and this applies to streaming applications, too. Input is fed from Hadoop to the executable in a series of tab-separated lines where:
Hadoop collects output from the executable using the same protocol with each line in stdout read as a new, tab-separated key-value pair. The executable can also write to the error stream (stderr) to update job counters or status information.
For mappers, the streaming interface is the same as with the MapReduce API—the input contains a key-value pair. For reducers, the interface is different—in MapReduce, the reducer is called with a key and an array of values, but in the streaming API, the array is flattened, which means the executable is called with the same key multiple times with a single value from the array in each call.
In the rest of the chapter, we'll see how to run our simple word count from Chapter 2 as a Hadoop Streaming job.
Python is a great option for Hadoop Streaming jobs. It’s a popular language among engineers and analysts, it’s cross-platform, and, in most operating systems, it’s a standard install. You can reasonably expect it will be pre-installed on your cluster, and you can also run a simplified version of the MapReduce pipeline locally to verify your scripts.
Python is also a very concise language. The full source code is on GitHub in sixeyed/hadoop-succinctly/python. Code Listing 22 shows the Python mapper in its entirety.
Code Listing 22: Word Count Mapper in Python
#!/usr/bin/env python import sys for line in sys.stdin: line = line.strip() words = line.split() for word in words: if 'dfs' in word: print '%s\t%s' % (word, 1) |
This script loops while there is input to stdin—the input will be fed from the Java VM that bridges the mapper task with the Python runtime. For each line, we clear any leading or trailing whitespace, then split the line into words. For each word, if it contains the string “dfs,” we print it to stdout.
Writing the tab-separated output to stdout is the equivalent of writing to Context in the Java MapReduce—it writes intermediate output that will be fed into the reducer.
Streaming reducers are a little more involved than Java reducers because the input comes as individual key-value pairs rather than a key with collection of values. But the Hadoop guarantee that data entering the reducer will be sorted still holds true, so streaming reducers know that when a key changes, the input is complete for the previous key.
Code Listing 23 shows the reducer code in Python.
Code Listing 23: Word Count Reducer in Python
#!/usr/bin/env python import sys last_match = None total_count = 0 for line in sys.stdin: line = line.strip() match, count = line.split('\t') count = int(count) if not last_match: last_match = match if last_match == match: total_count += count else: print '%s\t%s' % (last_match, total_count) total_count = 1 last_match = match print '%s\t%s' % (last_match, total_count) |
The code loops through stdin, but because it is a reducer, it will be receiving the intermediate output from the mapper. The reducer is stateful, and it will increment the running total while it receives input for a particular key. When the key changes, the reducer writes out the total for the previous key and resets its state.
By chaining together Linux cat and sort commands with calls to the Python scripts, we can mimic the behavior of Hadoop and verify that the scripts work correctly. Python is installed on the Hadoop Succinctly Docker container, and the Python scripts for this chapter are available in the /python directory. We can run them using the command in Code Listing 24, which also shows the abbreviated output.
Code Listing 24: Verifying the Python Scripts
# cat $HADOOP_HOME/etc/hadoop/* | /python/mapper.py | sort | /python/reducer.py "dfs" 3 #*.sink.ganglia.tagsForPrefix.dfs= 1 ... |
To submit the Python job, we use the same Hadoop-streaming.jar archive as the driver, but we also need to use the file argument in order to specify the paths to the scripts we want to run. Hadoop will copy those files to HDFS so that they are available to any node that runs a task. Code Listing 25 shows the full streaming job submission.
Code Listing 25: Submitting the Streaming Python Job
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \ -input input \ -output output4 \ -mapper mapper.py \ -reducer reducer.py \ -file /python/mapper.py \ -file /python/reducer.py |
In the jar command, we specify an output directory, which is where the results from the reducer will be written. As before, we can view the combined results using Hadoop fs –cat, as shown in Code Listing 26.
Code Listing 26: Results of the Streaming Python Job
# hadoop fs -cat output4/* "dfs" 3 #*.sink.ganglia.tagsForPrefix.dfs= 1 #dfs.class=org.apache.hadoop.metrics.file.FileContext 1 |
For .NET programmers, Microsoft has provided a library that mimics the Hadoop Java API and lets us build MapReduce jobs in .NET using similar constructs to those used in Java. There are base classes for mappers and reducers, and we can work in a type-safe manner with our key and value pairs.
However, this is only a syntactic wrapper around Hadoop Streaming—ultimately a .NET project is built as an executable that reads and writes through standard input and output streams in the same way as a normal console application. The .NET API also hasn't been refreshed for a while, which means you must decide if taking a dependency on an older API is worth the value you get from wrapping the streaming interface.
In this chapter, we'll stick with Hadoop Streaming and the .NET source code on GitHub in sixeyed/hadoop-succinctly/dotnet, which has two console apps—one for the mapper and one for the reducer. The code is fundamentally the same as the Python variant—an instance of the mapper will be created for each task and fed lines from the input file into stdin.
The Main() method in the mapper class loops through stdin while there are more lines, as with the code in Code Listing 27.
Code Listing 27: Reading Input in the .NET Mapper
static void Main(string[] args) { string line; while ((line = Console.ReadLine()) != null) { line = line.Trim(); //... } } |
When a nonempty line is received, the mapper splits the input on the space character and checks the elements for words containing the search string, as in Code Listing 28.
Code Listing 28: The .NET Mapper
var words = line.Split(' '); foreach (var word in words) { if (word.Contains("dfs")) { Console.WriteLine(string.Format("{0}\t1", word)); } } |
Emitting a key-value pair to the context is a case of formatting a string with the key and value separated by a tab character and writing it with Console.WriteLine().
The reducer class works in the same way, with a Main() method that loops through the console input stream. The overall input to the reducer will be the same sorted and merged input that Hadoop would provide to a Java reducer, but it will be flattened so that each item in the value collection causes another line of input to the reducer.
Code Listing 29 shows the Main() method for the reducer in which the read loop is the same as the mapper, but we initialize some variables to maintain state in the loop.
Code Listing 29: Reading Input in the .NET Reducer
static void Main(string[] args) { string lastMatch = null; int totalCount = 0; string line; while ((line = Console.ReadLine()) != null) { line = line.Trim(); //... } |
In the read loop, the functionality is the same as in the Python script—keep a running count for each key and when the key changes, emit the previous count and reset it. Code Listing 30 shows the .NET reducer implementation.
Code Listing 30: The .NET Reducer
var parts = line.Split('\t'); var match = parts[0]; var count = int.Parse(parts[1]); if (lastMatch == null) { lastMatch = match; } if (lastMatch == match) { totalCount += count; } else { Console.WriteLine(string.Format("{0}\t{1}", lastMatch, totalCount)); totalCount = 1; lastMatch = match; } Console.WriteLine(string.Format("{0}\t{1}", lastMatch, totalCount)); |
In order to run .NET programs, you either need a Windows machine with the .NET Framework installed or you can use the cross-platform .NET Core on any runtime. Running on a Windows cluster, you submit the streaming program to Hadoop in the usual way, specifying the name of the executable programs you want to run and shipping the binaries with the file argument.
Code Listing 31 shows the submit command and abbreviated results from the .NET Streaming job.
Code Listing 31: Submitting the Streaming .NET Job
hadoop jar %HADOOP_HOME%\share\hadoop\tools\lib\hadoop-streaming-2.7.2.jar \ -mapper "c:\dotnet\Sixeyed.HadoopSuccinctly.Streaming.Mapper.exe" \ -reducer "c:\dotnet\Sixeyed.HadoopSuccinctly.Streaming.Reducer.exe" \ -input input \ -output output4 |
With the streaming interface, we build the same type of MapReduce components using the language of our choice. There's no need for an explicit driver, because the hadoop-streaming.jar provided by the Hadoop installation acts as the driver. We can also write mappers and reducers in the language of our choice, and we can include whichever unit-testing tools we normally use.
Because streaming executables are simply console apps, we can also run integration tests using a subset of the input data—feeding a single file into the mapper and capturing the output, which will feed as the input for the reducer. And, because we incur the cost of running the Java VM that bridges between Hadoop and our process, a decision to use streaming will include a performance consideration.
However, be careful with interoperability in streaming apps. The protocol between Hadoop and your executable always passes data as strings, so if you are trying to mix a non-Java mapper with a Java reducer, you must be sure your mapper serializes the key and value output in the expected format so that the reducer can deserialize.
Another important consideration for streaming apps is ensuring that the executable can run on the data nodes. Typically, that means setting up the executable platform during the commissioning of your servers—if you know you'll be using Python or the .NET Framework, you need to have the required version installed as part of your setup.
If you use additional libraries in your executable programs, you will need to make sure they'll be available on the node where the task runs. You can use the same dependency-shipping approach that Hadoop provides for distributing JAR libraries with Java MapReduce programs. The hadoop-streaming.jar supports the archive argument that lets you specify a local package to ship along with your MapReduce program.
If your executable has library dependencies, you can bundle them into a ZIP package and specify that ZIP file in the archive argument for your job, as in Code Listing 32.
Code Listing 32: Submitting Streaming Jobs with Dependencies
hadoop jar %HADOOP_HOME%\share\hadoop\tools\lib\hadoop-streaming-2.7.2.jar \ -input input \ -output output4 \ -mapper /lib/mapper-with-depdencies.exe \ -reducer /lib/reducer-with-dependencies.exe \ -archive c:\dotnet\dependencies.zip |
Note: You need not ship your executables for streaming jobs; you can copy them to HDFS first and specify the relative HDFS path in the mapper and reducer arguments, as in this example.
When this job runs, Hadoop will copy the archive file to HDFS with a high-replication factor (a default of 10)—the aim being that it will already be local to nodes that run tasks for the job. When the task runs, it extracts the archive to the working directory for the job, so that your executable will find its dependencies in the current directory.
When a Java MapReduce task runs on a node in Hadoop, it first creates a Java VM to run the JAR file where the mapper or reducer is built. For a streaming application, there is no Java code, but a JVM is nevertheless created, and unless you're using a system command, the executable will also need to bootstrap its own runtime. The cost of loading the executable runtime and the JVM will impact the overall runtime of your job.
It can be difficult to assess that cost because you can't use typical instrumentation techniques—by the time your instrumentation code runs, the runtime has already been loaded and the cost already incurred. The difference is often unimportant because you choose to use streaming for nonfunctional reasons (having existing logic you want to re-use or having greater experience in a non-Java platform). However, it's always useful to understand the cost of making that decision.
We can get a reasonable idea of cost by benchmarking similar MapReduce programs running through different platforms. Table 2 shows the results from running the word count MapReduce through Java, Python, and .NET.
Runtime | Average Total Map Time | Average Total Reduce Time | Average Total Job Time | Difference From Java Baseline |
|---|---|---|---|---|
Java | 102.3 | 31.7 | 134 | 0 |
Python | 112.9 | 34.0 | 146.9 | 12.9 |
.NET | 108.2 | 33.3 | 141.6 | 7.6 |
Table 2: Compute Time for MapReduce Programs
For this benchmark, I used the MapReduce programs, all of which are on GitHub, and I ran each job five times to get a reasonable average. Because the .NET program uses the full framework, it must run on Windows, so I ran all the jobs on the same single-node Syncfusion Big Data Studio installation (more on that in Chapter 7).
We can see there is an impact in using streaming compared to native MapReduce—streaming jobs take 5-10% longer, depending on the runtime. If you’re considering using streaming for the bulk of your Hadoop access, that’s something to be aware of. The impact is not significant enough to discount streaming, but time-critical jobs may need to be done in Java.
Hadoop is a Java platform, and although Java is the native language for running MapReduce jobs, Hadoop has strong support for any platforms that can produce an executable component. That allows you to build MapReduce programs in which the mapper and reducer are written in any language you prefer, provided you can configure your Hadoop nodes with all the prerequisites they need to use the platform.
In this chapter, we looked at how to reproduce the simple MapReduce program from Chapter 2 by using Python and the .NET Framework. The streaming interface uses a simple protocol in which data is fed into stdin line by line, as tab-separated key-value pairs, and Hadoop expects output to come into stdout using the same format.
Although you should be aware of type safety and the performance impact of Hadoop Streaming, typically both concerns are insignificant compared to the ability to leverage your preferred platform and any existing technical assets you have. Hadoop Streaming supports shipping dependencies to nodes, which means you can build complex executables for your mappers and reducers and have them run without issue on distributed nodes.
The Hadoop Streaming interface is only one aspect of the cross-platform support for Hadoop. In the next chapter, we'll look more closely at the moving parts in the Hadoop cluster. In Chapter 7, we'll see how different Hadoop distributions run on different platforms.