Introduction
A common technique in MapReduce is to input a group of records, calculate a value from that group, and emit each record with the new value attached. While this is easy to do in raw MR jobs, the solution in Cascading is not very obvious. This tutorial introduces a new operation to Cascading called BufferedSum. BufferedSum allows us to calculate values from a group of tuples and emit the group value to individual tuples in a scalable way.
Describing the operation of BufferedSum is clearer when discussed in concrete terms, so let’s work with an example.
Example
When dealing with large amounts of documents in Hadoop, its common to have each input file to contain many documents. Our input file in this case will contain two documents:
a.txt\thello world world
b.txt\tgoodbye goodbye world
Lets say we want to calculate tf-idf for these documents. One of the first values we need is the count of the occurrence particular term within each document.
First, we will split each line into (document_id, body) pairs:
pipe = new Each(pipe,
new Fields("line"),
new RegexSplitter(new Fields("document_id", "body"), "\t"));
From there we “tokenize” the document and extract each term:
pipe = new Each(pipe, // tokenize words by space
new Fields("body"),
new RegexSplitGenerator(new Fields("term"), "\\s+"),
new Fields("document_id", "term"));
Now our tuple stream is the following:
a.txt hello
a.txt world
a.txt world
b.txt goodbye
b.txt goodbye
b.txt world
Count of term in document_id
We now have (document_id, term) and we want to calculate (document_id, term, term_count_in_document). With Cascading, this is easy, simply group by document_id and term and use the Count() function:
// count how many times `term` appears in `document_id`
pipe = new GroupBy(pipe, new Fields("document_id", "term"));
pipe = new Every(pipe,
new Fields("term"),
new Count(new Fields("term_count_in_document")),
new Fields("document_id", "term", "term_count_in_document"));
Calculating total_terms_in_document
So far, so good. Up to this point Cascading has provided everything we need. However, next we want to get the total terms within each document and keep the tuples we have calculated thus far. Put another way, we have an input of (document_id, term, term_count_in_document) and we want to emit (document_id, term, term_count_in_document, total_terms_in_document)
Our first instinct might be to use GroupBy() and Count() like before. But there is a catch: Every operations emit the operator result with the group tuple (see the Each and Every Pipes in the Cascading User Guide).
This means if we group by document_id and Sum() the total_terms_in_document we will emit (document_id, total_terms_in_document). The number in total_terms_in_document will be accurate, but we lose our term and term_count_in_document.
If we try to save our other fields by grouping on all three of them (document_id, term, term_count_in_document) then we’ve “over-grouped” and every “group” is a single tuple (the input tuple) and we won’t get the count of terms in the document as a whole. BufferedSum was created to solve this problem.
BufferedSum
BufferedSum takes as its input three things:
- The name of the
Field to output
- The name of the
Field to sum
- The other
Fields to “pull through” the operation
Here is how we can use BufferedSum to achieve the desired effect:
// input: (document_id, term, term_count_in_document)
// emits: (document_id, term, term_count_in_document, total_terms_in_document)
pipe = new GroupBy(pipe, new Fields("document_id"));
pipe = new Every(pipe,
new BufferedSum(new Fields("total_terms_in_document"),
new Fields("term_count_in_document"),
new Fields("document_id", "term", "term_count_in_document")),
Fields.SWAP);
Note: the output selector Fields.SWAP is critical due to Cascading tuple selection.
Memory considerations
One thing to be careful of when using BufferedSum is to try and keep your groups small enough to fit in memory. However, this is not a requirement. BufferedSum uses Cascading’s SpillableTupleList which will spill to the HDFS if it grows too large. That said, spilling is an expensive operation and should be avoided if possible.
Summary
BufferedSum is a widely useful operation when dealing with sums in Cascading. In Part 2 we will use BufferedSum and Cascading to finish calculating tf-idf.
The Code
package com.xcombinator.cascading.operations.buffers;
import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Buffer;
import cascading.operation.BufferCall;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.SpillableTupleList;
import java.util.Iterator;
/**
* BufferedSum sums a value for every Tuple in a Group and emits every input
* Tuple with the sum appended.
* <p/>
*
*
* EXAMPLE:
*
* {@code
*
* // input: (document_id, term, term_count_in_document)
* // emits: (document_id, term, term_count_in_document, total_terms_in_document)
*
* pipe = new GroupBy(pipe, new Fields("document_id"));
* pipe = new Every(pipe,
* new BufferedSum(new Fields("total_terms_in_document"),
* new Fields("term_count_in_document"),
* new Fields("document_id", "term", "term_count_in_document")),
* Fields.SWAP);
* }
*
* @see BufferedSum
*
*/
public class BufferedSum extends BaseOperation implements Buffer
{
private Double sum;
private SpillableTupleList list;
private Fields extrasSelector;
private Fields fieldToSum;
/**
* Returns a BufferedSum Buffer Operation.
*
* @param emittedSumFieldName a {@link Fields} naming the field to emit the sum value
* @param fieldToSum a {@link Fields} naming the field to sum
* @param extrasSelector a {@link Fields} naming the other fields to "pull through". These fields *must* be of the same order and size as the input Tuple
*/
public BufferedSum( Fields emittedSumFieldName, Fields fieldToSum, Fields extrasSelector )
{
super( extrasSelector.append( emittedSumFieldName ) );
this.extrasSelector = extrasSelector;
this.fieldToSum = fieldToSum;
}
public void operate( FlowProcess flowProcess, BufferCall bufferCall )
{
Iterator<TupleEntry> iterator = bufferCall.getArgumentsIterator();
sum = 0.0D;
list = new SpillableTupleList( 10000 );
while( iterator.hasNext() )
{
TupleEntry arguments = iterator.next(); // must be called
sum += arguments.getDouble( this.fieldToSum.get(0) );
list.add( arguments.getTuple() );
}
for( Tuple tuple : list )
{
bufferCall.getOutputCollector().add( tuple.append( new Tuple( sum ) ) );
}
}
}
Cascading, TF-IDF, and BufferedSum (Part 1)
Introduction
A common technique in MapReduce is to input a group of records, calculate a value from that group, and emit each record with the new value attached. While this is easy to do in raw MR jobs, the solution in Cascading is not very obvious. This tutorial introduces a new operation to Cascading called
BufferedSum.BufferedSumallows us to calculate values from a group of tuples and emit the group value to individual tuples in a scalable way.Describing the operation of
BufferedSumis clearer when discussed in concrete terms, so let’s work with an example.Example
When dealing with large amounts of documents in Hadoop, its common to have each input file to contain many documents. Our input file in this case will contain two documents:
Lets say we want to calculate tf-idf for these documents. One of the first values we need is the count of the occurrence particular term within each document.
First, we will split each line into
(document_id, body)pairs:From there we “tokenize” the document and extract each term:
Now our tuple stream is the following:
Count of
termindocument_idWe now have
(document_id, term)and we want to calculate(document_id, term, term_count_in_document). With Cascading, this is easy, simply group bydocument_idandtermand use theCount()function:Calculating
total_terms_in_documentSo far, so good. Up to this point Cascading has provided everything we need. However, next we want to get the total terms within each document and keep the tuples we have calculated thus far. Put another way, we have an input of
(document_id, term, term_count_in_document)and we want to emit(document_id, term, term_count_in_document, total_terms_in_document)Our first instinct might be to use
GroupBy()andCount()like before. But there is a catch:Everyoperations emit the operator result with the group tuple (see the Each and Every Pipes in the Cascading User Guide).This means if we group by
document_idandSum()thetotal_terms_in_documentwe will emit(document_id, total_terms_in_document). The number intotal_terms_in_documentwill be accurate, but we lose ourtermandterm_count_in_document.If we try to save our other fields by grouping on all three of them
(document_id, term, term_count_in_document)then we’ve “over-grouped” and every “group” is a single tuple (the input tuple) and we won’t get the count of terms in the document as a whole.BufferedSumwas created to solve this problem.BufferedSumBufferedSumtakes as its input three things:Fieldto outputFieldto sumFieldsto “pull through” the operationHere is how we can use
BufferedSumto achieve the desired effect:Memory considerations
One thing to be careful of when using
BufferedSumis to try and keep your groups small enough to fit in memory. However, this is not a requirement.BufferedSumuses Cascading’sSpillableTupleListwhich will spill to the HDFS if it grows too large. That said, spilling is an expensive operation and should be avoided if possible.Summary
BufferedSumis a widely useful operation when dealing with sums in Cascading. In Part 2 we will useBufferedSumand Cascading to finish calculating tf-idf.The Code