Last time we talked about how to use a raw MapReduce job in Cascading. Now we are going to up the ante by using Hadoop Streaming as a Flow in Cascading. In this example, we hook a python streaming job into a Cascade.
Its pretty easy once you know how to do it:
- Create a JobConf that defines the parameters for the streaming job
- Send up the
hadoop-*-streaming.jarwith your cascading job by putting it in yourjar - Send up the scripts (python, in this case) by using the
-fileoption - Send up any other dependencies, corpora, etc. by using the
-file,-cacheFile, or-cacheArchiveoptions (See the Hadoop Streaming page for more details)
Resources
NLTK
To generate the nltkandyaml.mod zip file do the following:
# download nltk and unzip
cd nltk
zip -r nltkandyaml.zip nltk yaml
mv nltkandyaml.zip nltkandyaml.mod
Note that this technique is taken from Cloudera
WordNet
The WordNet zip file needs to be flat. e.g. don’t zip up the files with a subdirectory. You could create this file like so:
# download and unzip the wordnet corpus
cd wordnet
zip -r ../wordnet-flat.zip *
Streaming Script
In python, we’ll be using zipimport.zipimporter to import the nltk libraries from a zip file. In Hadoop 0.20.0, Hadoop didn’t decompress our wordnet-flat.zip file automatically (but we’ve heard reports that it will, but I’m not sure which versions). For us the .zip file was placed in lib relative to the pwd of the script. This allowed us to keep the WordNet corpus as a zip and read it in that format.
wn = WordNetCorpusReader(nltk.data.find('lib/wordnet-flat.zip'))
(In this code we’re not using the python reducer.)
#!/usr/bin/env python import os import re import sys import zipimport importer = zipimport.zipimporter('nltkandyaml.mod') yaml = importer.load_module('yaml') nltk = importer.load_module('nltk') punct = re.compile('[^\w\s]+') from nltk.corpus.reader import wordnet from nltk.corpus.reader import WordNetCorpusReader nltk.data.path += ["."] wn = WordNetCorpusReader(nltk.data.find('lib/wordnet-flat.zip')) def mapper(args): line = sys.stdin.readline(); try: while line: line = line.strip() word = line all_synonyms = [] string_synsets = wn.synsets(word) for synset in string_synsets: synonyms = [lemma.name for lemma in wn.synset(synset.name).lemmas] synonyms.pop(0) for synonym in synonyms: synonym = re.sub("_", " ", synonym) all_synonyms.append(synonym) print "\t".join([word, ','.join(all_synonyms)]) line = sys.stdin.readline() except "end of file": return None # we're not using this, but we could def reducer(args): for line in sys.stdin: line = line.strip() print line if __name__ == "__main__": if sys.argv[1] == "mapper": mapper(sys.argv[2:]) elif sys.argv[1] == "reducer": reducer(sys.argv[2:])
Cascading Code
Here’s the bulk of the code that will achieve the effect we want. Like last time, we’re using two intermediate taps as the input and output of the streaming job. Also, we’re just using TextLine files for simplicity. If you don’t want the intermediate files hanging around, look at the comments towards the bottom for some example code on how to remove the files when the job is finished running.
package com.xcombinator.hadoopjobs.cascadingstreamingtest; import cascading.cascade.*; import cascading.flow.Flow; import cascading.flow.FlowConnector; import cascading.flow.MapReduceFlow; import cascading.operation.aggregator.Count; import cascading.operation.regex.*; import cascading.pipe.*; import cascading.scheme.*; import cascading.tap.*; import cascading.tuple.Fields; import cascading.operation.Identity; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; import cascading.operation.Debug; import org.apache.hadoop.streaming.StreamJob; import java.io.IOException; /** * An example file to use a Hadoop Streaming job in cascading */ public class Main extends Configured implements Tool { private static final Logger LOG = Logger.getLogger( Main.class ); public int run(String[] args) { JobConf conf = new JobConf(getConf(), this.getClass()); Properties properties = new Properties(); FlowConnector.setApplicationJarClass(properties, this.getClass()); CascadeConnector cascadeConnector = new CascadeConnector(); FlowConnector flowConnector = new FlowConnector(properties); String inputPath = args[0]; String outputPath = args[1]; String intermediatePath1 = args[1] + "-mr-input"; String intermediatePath2 = args[1] + "-mr-output"; Scheme textLineScheme = new TextLine(); Tap sourceTap = new Hfs(textLineScheme, inputPath); Tap intermediateTap1 = new Hfs(textLineScheme, intermediatePath1); Tap intermediateTap2 = new Hfs(textLineScheme, intermediatePath2); Tap sinkTap = new Hfs(textLineScheme, outputPath); // create our first flow, sink to the intermediateTap Pipe wsPipe = new Each("wordsplit", new Fields("line"), new RegexSplitGenerator(new Fields("word"), "\\s+"), new Fields("word")); Flow parsedLogFlow = flowConnector.connect(sourceTap, intermediateTap1, wsPipe); // Create a pipe and set our mr job for it Pipe importPipe = new Pipe("mr pipe"); Flow mrFlow; try { JobConf streamConf = StreamJob.createJob( new String[]{ "-input", intermediateTap1.getPath().toString(), "-output", intermediateTap2.getPath().toString(), // straight unix // "-mapper", "/bin/cat", // "-reducer", "/usr/bin/wc" // ruby // "-mapper", "src/main/ruby/word_count_mapper.rb", // "-reducer", "src/main/ruby/word_count_reducer.rb", // "-file", "src/main/ruby/word_count_mapper.rb", // "-file", "src/main/ruby/word_count_reducer.rb" // python "-mapper", "python synsets.py mapper", "-reducer", "org.apache.hadoop.mapred.lib.IdentityReducer", "-file", "src/main/python/synsets.py", "-file", "resources/nltkandyaml.mod", "-file", "resources/lib/wordnet-flat.zip", }); mrFlow = new MapReduceFlow("streaming flow", streamConf, intermediateTap1, intermediateTap2, false, true); } catch(IOException ioe) { ioe.printStackTrace(); System.exit(1); return 1; } // create our third "regular" cascading pipe. this is a bit contrived, but // the idea is substitute all 'e's with 'x's. it's just here to show how to // take the input of a streaming job back into cascading Pipe subPipe = new Pipe("subber"); subPipe = new Each(subPipe, new Fields("line"), new RegexReplace(new Fields("linx"), "e", "x", true), new Fields("linx")); Flow subFlow = flowConnector.connect(intermediateTap2, sinkTap, subPipe); Cascade cascade = cascadeConnector.connect(parsedLogFlow, mrFlow, subFlow); cascade.complete(); // to get rid of the intermediate files you could do this: // Path tmp = tap.getPath(); // FileSystem fs = tmp.getFileSystem(conf); // fs.delete(tmp, true); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Main(), args); System.exit(res); } }

Cascading, TF-IDF, and BufferedSum (Part 1)
Introduction
A common technique in MapReduce is to input a group of records, calculate a value from that group, and emit each record with the new value attached. While this is easy to do in raw MR jobs, the solution in Cascading is not very obvious. This tutorial introduces a new operation to Cascading called
BufferedSum.BufferedSumallows us to calculate values from a group of tuples and emit the group value to individual tuples in a scalable way.Describing the operation of
BufferedSumis clearer when discussed in concrete terms, so let’s work with an example.Example
When dealing with large amounts of documents in Hadoop, its common to have each input file to contain many documents. Our input file in this case will contain two documents:
Lets say we want to calculate tf-idf for these documents. One of the first values we need is the count of the occurrence particular term within each document.
First, we will split each line into
(document_id, body)pairs:From there we “tokenize” the document and extract each term:
Now our tuple stream is the following:
Count of
termindocument_idWe now have
(document_id, term)and we want to calculate(document_id, term, term_count_in_document). With Cascading, this is easy, simply group bydocument_idandtermand use theCount()function:Calculating
total_terms_in_documentSo far, so good. Up to this point Cascading has provided everything we need. However, next we want to get the total terms within each document and keep the tuples we have calculated thus far. Put another way, we have an input of
(document_id, term, term_count_in_document)and we want to emit(document_id, term, term_count_in_document, total_terms_in_document)Our first instinct might be to use
GroupBy()andCount()like before. But there is a catch:Everyoperations emit the operator result with the group tuple (see the Each and Every Pipes in the Cascading User Guide).This means if we group by
document_idandSum()thetotal_terms_in_documentwe will emit(document_id, total_terms_in_document). The number intotal_terms_in_documentwill be accurate, but we lose ourtermandterm_count_in_document.If we try to save our other fields by grouping on all three of them
(document_id, term, term_count_in_document)then we’ve “over-grouped” and every “group” is a single tuple (the input tuple) and we won’t get the count of terms in the document as a whole.BufferedSumwas created to solve this problem.BufferedSumBufferedSumtakes as its input three things:Fieldto outputFieldto sumFieldsto “pull through” the operationHere is how we can use
BufferedSumto achieve the desired effect:Memory considerations
One thing to be careful of when using
BufferedSumis to try and keep your groups small enough to fit in memory. However, this is not a requirement.BufferedSumuses Cascading’sSpillableTupleListwhich will spill to the HDFS if it grows too large. That said, spilling is an expensive operation and should be avoided if possible.Summary
BufferedSumis a widely useful operation when dealing with sums in Cascading. In Part 2 we will useBufferedSumand Cascading to finish calculating tf-idf.The Code