Cascading, TF-IDF, and BufferedSum (Part 1)

Introduction

A common technique in MapReduce is to input a group of records, calculate a value from that group, and emit each record with the new value attached. While this is easy to do in raw MR jobs, the solution in Cascading is not very obvious. This tutorial introduces a new operation to Cascading called BufferedSum. BufferedSum allows us to calculate values from a group of tuples and emit the group value to individual tuples in a scalable way.

Describing the operation of BufferedSum is clearer when discussed in concrete terms, so let’s work with an example.

Example

When dealing with large amounts of documents in Hadoop, its common to have each input file to contain many documents. Our input file in this case will contain two documents:

a.txt\thello world world
b.txt\tgoodbye goodbye world

Lets say we want to calculate tf-idf for these documents. One of the first values we need is the count of the occurrence particular term within each document.

First, we will split each line into (document_id, body) pairs:

  pipe = new Each(pipe, 
      new Fields("line"), 
      new RegexSplitter(new Fields("document_id", "body"), "\t"));

From there we “tokenize” the document and extract each term:

  pipe = new Each(pipe, // tokenize words by space
      new Fields("body"),
      new RegexSplitGenerator(new Fields("term"), "\\s+"), 
      new Fields("document_id", "term"));

Now our tuple stream is the following:

a.txt hello
a.txt world
a.txt world
b.txt goodbye
b.txt goodbye
b.txt world

Count of term in document_id

We now have (document_id, term) and we want to calculate (document_id, term, term_count_in_document). With Cascading, this is easy, simply group by document_id and term and use the Count() function:

  // count how many times `term` appears in `document_id`
  pipe = new GroupBy(pipe, new Fields("document_id", "term"));
  pipe = new Every(pipe, 
      new Fields("term"), 
      new Count(new Fields("term_count_in_document")), 
      new Fields("document_id", "term", "term_count_in_document"));

Calculating total_terms_in_document

So far, so good. Up to this point Cascading has provided everything we need. However, next we want to get the total terms within each document and keep the tuples we have calculated thus far. Put another way, we have an input of (document_id, term, term_count_in_document) and we want to emit (document_id, term, term_count_in_document, total_terms_in_document)

Our first instinct might be to use GroupBy() and Count() like before. But there is a catch: Every operations emit the operator result with the group tuple (see the Each and Every Pipes in the Cascading User Guide).

This means if we group by document_id and Sum() the total_terms_in_document we will emit (document_id, total_terms_in_document). The number in total_terms_in_document will be accurate, but we lose our term and term_count_in_document.

If we try to save our other fields by grouping on all three of them (document_id, term, term_count_in_document) then we’ve “over-grouped” and every “group” is a single tuple (the input tuple) and we won’t get the count of terms in the document as a whole. BufferedSum was created to solve this problem.

BufferedSum

BufferedSum takes as its input three things:

  • The name of the Field to output
  • The name of the Field to sum
  • The other Fields to “pull through” the operation

Here is how we can use BufferedSum to achieve the desired effect:

// input: (document_id, term, term_count_in_document)
// emits: (document_id, term, term_count_in_document, total_terms_in_document) 
pipe = new GroupBy(pipe, new Fields("document_id"));
pipe = new Every(pipe, 
    new BufferedSum(new Fields("total_terms_in_document"), 
                    new Fields("term_count_in_document"),
                    new Fields("document_id", "term", "term_count_in_document")), 
    Fields.SWAP);

Note: the output selector Fields.SWAP is critical due to Cascading tuple selection.

Memory considerations

One thing to be careful of when using BufferedSum is to try and keep your groups small enough to fit in memory. However, this is not a requirement. BufferedSum uses Cascading’s SpillableTupleList which will spill to the HDFS if it grows too large. That said, spilling is an expensive operation and should be avoided if possible.

Summary

BufferedSum is a widely useful operation when dealing with sums in Cascading. In Part 2 we will use BufferedSum and Cascading to finish calculating tf-idf.

The Code

package com.xcombinator.cascading.operations.buffers;
import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Buffer;
import cascading.operation.BufferCall;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.SpillableTupleList;
 
import java.util.Iterator;
 
/**
 * BufferedSum sums a value for every Tuple in a Group and emits every input
 * Tuple with the sum appended.
 * <p/>
 * 
 * 
 * EXAMPLE:
 *
 * {@code 
 *
 * // input: (document_id, term, term_count_in_document)
 * // emits: (document_id, term, term_count_in_document, total_terms_in_document) 
 *
 *     pipe = new GroupBy(pipe, new Fields("document_id"));
 *     pipe = new Every(pipe, 
 *         new BufferedSum(new Fields("total_terms_in_document"), 
 *                        new Fields("term_count_in_document"),
 *                        new Fields("document_id", "term", "term_count_in_document")), 
 *         Fields.SWAP);
 * }
 *
 * @see BufferedSum
 * 
 */
 
public class BufferedSum extends BaseOperation implements Buffer
  {
  private Double sum;
  private SpillableTupleList list;
  private Fields extrasSelector;
  private Fields fieldToSum;
 
  /**
   * Returns a BufferedSum Buffer Operation. 
   *
   * @param emittedSumFieldName a {@link Fields} naming the field to emit the sum value
   * @param fieldToSum          a {@link Fields} naming the field to sum
   * @param extrasSelector      a {@link Fields} naming the other fields to "pull through". These fields *must* be of the same order and size as the input Tuple
   */
  public BufferedSum( Fields emittedSumFieldName, Fields fieldToSum, Fields extrasSelector )
    {
    super( extrasSelector.append( emittedSumFieldName ) );
    this.extrasSelector = extrasSelector;
    this.fieldToSum = fieldToSum;
    }
 
  public void operate( FlowProcess flowProcess, BufferCall bufferCall )
    {
    Iterator<TupleEntry> iterator = bufferCall.getArgumentsIterator();
    sum = 0.0D;
    list = new SpillableTupleList( 10000 );
 
    while( iterator.hasNext() )
      {
      TupleEntry arguments = iterator.next(); // must be called
      sum += arguments.getDouble( this.fieldToSum.get(0) );
      list.add( arguments.getTuple() );
      }
 
    for( Tuple tuple : list )
      {
      bufferCall.getOutputCollector().add( tuple.append( new Tuple( sum ) ) );
      }
 
    }
  }
Posted in cascading, hadoop, programming | Leave a comment

How to use Cascading with Hadoop Streaming

Last time we talked about how to use a raw MapReduce job in Cascading. Now we are going to up the ante by using Hadoop Streaming as a Flow in Cascading. In this example, we hook a python streaming job into a Cascade.

Its pretty easy once you know how to do it:

  • Create a JobConf that defines the parameters for the streaming job
  • Send up the hadoop-*-streaming.jar with your cascading job by putting it in your jar
  • Send up the scripts (python, in this case) by using the -file option
  • Send up any other dependencies, corpora, etc. by using the -file, -cacheFile, or -cacheArchive options (See the Hadoop Streaming page for more details)

Resources

NLTK

To generate the nltkandyaml.mod zip file do the following:

# download nltk and unzip
cd nltk
zip -r nltkandyaml.zip nltk yaml
mv nltkandyaml.zip nltkandyaml.mod

Note that this technique is taken from Cloudera

WordNet

The WordNet zip file needs to be flat. e.g. don’t zip up the files with a subdirectory. You could create this file like so:

# download and unzip the wordnet corpus
cd wordnet
zip -r ../wordnet-flat.zip *

Streaming Script

In python, we’ll be using zipimport.zipimporter to import the nltk libraries from a zip file. In Hadoop 0.20.0, Hadoop didn’t decompress our wordnet-flat.zip file automatically (but we’ve heard reports that it will, but I’m not sure which versions). For us the .zip file was placed in lib relative to the pwd of the script. This allowed us to keep the WordNet corpus as a zip and read it in that format.

wn = WordNetCorpusReader(nltk.data.find('lib/wordnet-flat.zip'))

(In this code we’re not using the python reducer.)

#!/usr/bin/env python 
import os
import re
import sys
import zipimport
 
importer = zipimport.zipimporter('nltkandyaml.mod')
yaml = importer.load_module('yaml')
nltk = importer.load_module('nltk')
punct = re.compile('[^\w\s]+')
 
from nltk.corpus.reader import wordnet
from nltk.corpus.reader import WordNetCorpusReader
 
nltk.data.path += ["."]
wn = WordNetCorpusReader(nltk.data.find('lib/wordnet-flat.zip'))
 
def mapper(args):
  line = sys.stdin.readline();
  try:
    while line:
      line = line.strip()
      word = line
      all_synonyms = []
 
      string_synsets = wn.synsets(word)
 
      for synset in string_synsets:
        synonyms = [lemma.name for lemma in wn.synset(synset.name).lemmas]
        synonyms.pop(0)
        for synonym in synonyms:
          synonym = re.sub("_", " ", synonym)
          all_synonyms.append(synonym) 
 
      print "\t".join([word, ','.join(all_synonyms)])
      line = sys.stdin.readline()
  except "end of file":
    return None
 
# we're not using this, but we could
def reducer(args):
  for line in sys.stdin:
    line = line.strip()
    print line
 
if __name__ == "__main__":
  if sys.argv[1] == "mapper":
    mapper(sys.argv[2:])
  elif sys.argv[1] == "reducer":
    reducer(sys.argv[2:])

Cascading Code

Here’s the bulk of the code that will achieve the effect we want. Like last time, we’re using two intermediate taps as the input and output of the streaming job. Also, we’re just using TextLine files for simplicity. If you don’t want the intermediate files hanging around, look at the comments towards the bottom for some example code on how to remove the files when the job is finished running.

package com.xcombinator.hadoopjobs.cascadingstreamingtest;
 
import cascading.cascade.*;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.MapReduceFlow;
import cascading.operation.aggregator.Count;
import cascading.operation.regex.*;
import cascading.pipe.*;
import cascading.scheme.*;
import cascading.tap.*;
import cascading.tuple.Fields;
import cascading.operation.Identity;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import cascading.operation.Debug;
import org.apache.hadoop.streaming.StreamJob;
import java.io.IOException;
 
/**
 * An example file to use a Hadoop Streaming job in cascading
 */
public class Main extends Configured implements Tool
  {
  private static final Logger LOG = Logger.getLogger( Main.class );
 
  public int run(String[] args)
  {
    JobConf conf = new JobConf(getConf(), this.getClass());
    Properties properties = new Properties();
    FlowConnector.setApplicationJarClass(properties, this.getClass());
 
    CascadeConnector cascadeConnector = new CascadeConnector();
    FlowConnector flowConnector = new FlowConnector(properties);
 
    String inputPath  = args[0];
    String outputPath = args[1];
    String intermediatePath1 = args[1] + "-mr-input";
    String intermediatePath2 = args[1] + "-mr-output";
 
    Scheme textLineScheme = new TextLine();
 
    Tap sourceTap = new Hfs(textLineScheme, inputPath);
    Tap intermediateTap1 = new Hfs(textLineScheme, intermediatePath1);
    Tap intermediateTap2 = new Hfs(textLineScheme, intermediatePath2);
    Tap sinkTap   = new Hfs(textLineScheme, outputPath);
 
    // create our first flow, sink to the intermediateTap
    Pipe wsPipe = new Each("wordsplit", 
        new Fields("line"), 
        new RegexSplitGenerator(new Fields("word"), "\\s+"), 
        new Fields("word"));
 
    Flow parsedLogFlow = flowConnector.connect(sourceTap, intermediateTap1, wsPipe);
 
    // Create a pipe and set our mr job for it 
    Pipe importPipe = new Pipe("mr pipe");
    Flow mrFlow;
 
    try {
      JobConf streamConf = StreamJob.createJob( new String[]{
          "-input", intermediateTap1.getPath().toString(), 
          "-output", intermediateTap2.getPath().toString(),
 
          // straight unix
          // "-mapper", "/bin/cat",
          // "-reducer", "/usr/bin/wc"
 
          // ruby
          // "-mapper", "src/main/ruby/word_count_mapper.rb",
          // "-reducer", "src/main/ruby/word_count_reducer.rb",
          // "-file", "src/main/ruby/word_count_mapper.rb",
          // "-file", "src/main/ruby/word_count_reducer.rb"
 
          // python
          "-mapper", "python synsets.py mapper",
          "-reducer", "org.apache.hadoop.mapred.lib.IdentityReducer",
          "-file", "src/main/python/synsets.py",
          "-file", "resources/nltkandyaml.mod",
          "-file", "resources/lib/wordnet-flat.zip",
          });
      mrFlow = new MapReduceFlow("streaming flow", streamConf, intermediateTap1,
        intermediateTap2, false, true);
    } catch(IOException ioe) {
       ioe.printStackTrace();
       System.exit(1);
       return 1;
    }
 
    // create our third "regular" cascading pipe. this is a bit contrived, but
    // the idea is substitute all 'e's with 'x's. it's just here to show how to
    // take the input of a streaming job back into cascading
    Pipe subPipe = new Pipe("subber");
    subPipe = new Each(subPipe,
        new Fields("line"),
        new RegexReplace(new Fields("linx"), "e", "x", true),
        new Fields("linx"));
    Flow subFlow = flowConnector.connect(intermediateTap2, sinkTap, subPipe);
 
    Cascade cascade = cascadeConnector.connect(parsedLogFlow, mrFlow, subFlow);
    cascade.complete();
 
    // to get rid of the intermediate files you could do this:
    // Path tmp = tap.getPath();
    // FileSystem fs = tmp.getFileSystem(conf);
    // fs.delete(tmp, true);
 
    return 0;
  }
 
  public static void main(String[] args) throws Exception 
  {
    int res = ToolRunner.run(new Configuration(), new Main(), args);
    System.exit(res);
  }
 
  }
Posted in programming | Leave a comment

Interval – a ruby library for musical interval arithmetic

Interval

interval is a tiny library that provides simple musical note pitch and interval arithmetic. It is intended to do one thing: given a pitch add (or subtract) an interval and give the resulting pitch.

Observe:

p = Interval::Pitch.from_string("c")
i = Interval::Interval.from_string("M3")
p2 = p + i
p2.to_short_name # => "e"
 
i.to_s # => "Major Third"
 
i2 = Interval::Interval.from_string("p5")
i2.to_s # => "Perfect Fifth"
 
(p2 - i2).to_s # => "a"

Interval Quiz

interval was written primarily for learning intervals. interval-quiz is a gem that depends on interval that provides a command-line quiz. Here’s the output of an interval-quiz session:

$ interval-quiz
Here are the intervals:
unison  p1        a1
second  m2 M2  d2 a2
third   m3 M3  d3 a3
fourth  p4     d4 a4
fifth   d5 p5  d5 a5
sixth   m6 M6  d6 a6
seventh m7 M7  d7 a7
octave  p8     d8
enter the intervals you want (or a blank line to quit):
M3
p5

["M3", "p5"]
1. above
2. below
3. both
do you want to be quizzed on intervals above, below, or both?  3
what is a major third below f# ? d
correct!
what is a major third above g# 1/1 (100%)? b#
correct!
what is a major third below b 2/2 (100%)? g
correct!
what is a perfect fifth below eb 3/3 (100%)? a
wrong. the answer is ab
what is a perfect fifth below c# 3/4 (75%)? d
wrong. the answer is f#

Installing

gem install interval interval-quiz

Source

http://github.com/jashmenn/interval
http://github.com/jashmenn/interval-quiz

Posted in music, ruby | Leave a comment