James Murphy

Hadoop Part III: Multiple Output in Hadoop

Hadoop Part III: Multiple Output in Hadoop

The output of the Hadoop MapReduce particle filter from the previous post is simply a list of doubles giving the state for each particle after resampling.  This is not ideal because this post-resampling particle collection is a more crude representation of the post-observation state posterior than the pre-resampling, weighted collection.  Obviously we need to output the particle collection after resampling as input for the next stage, but it would be nice to be able to also output the pre-resampling collection.  For this, we need to investigate Hadoop’s logically named MultipleOutputs class.

Furthermore, the system of outputting the resampled particle collection as a list of doubles is fine in this case, where the system state for each particle is one-dimensional, but what about if the system state is of higher dimensionality, or we want to output additional properties for each particle (which will be important when we come to parameter estimation)?  In that case we need to output multiple numbers for each particle, so the DoubleWritable class we were using for output before just isn’t going to cut it.  Instead, we need to develop our custom Particle class so that it can output a more complete set of information about each particle.

Multiple Outputs

Getting the Hadoop reducer to produce multiple outputs is pretty straightforward.  All we need to do is to add a MultipleOutputs object to the reducer class PFReducer.  The new Reducer looks like this (where the highlighted lines are those that are new since the version in the previous post).

Here, we declare the myoutput object, and then override the base Reducer methods setup() and cleanup() to initialize and dispose of this object correctly. These methods are called, respectively, before and after the reducer runs and allow things to be set up and cleared up in preparation for running the reducer.

Now, to output values from the reducer we replace the context.write(key, value) statement we used before (where key was just an empty Text, and value was a DoubleWritable) with statements that write to the myobject multiple outputter.  These take the very similar form

where key and value are the same as before, but the additional argument OUTPUT_NAME is a string specifying the name of the output file.  Note, however, that these output files must be declared in advance in the driver, so  in the main body of the driver, we need to add a line of code for each output file of the form

where job is the current job and OUTPUT_NAME is the filename.  The code for the driver is given below.  The KeyClass and ValueClass are simply the classes of the keys and values that are going to be output in this file (so, e.g. Text.class and DoubleWritable.class). OutputFormatterClass is the class of an object which can turn objects of the key and value classes into output of the desired form.  A few basic ones of these are provided by Hadoop, like the TextOutputFormat, which outputs objects as text using their toString() method, and the SequenceFileOutputFormat, which writes binary objects for quicker reading.  The call to addNamedOutput might, therefore, look like this for an output where we only want a list of text keys / double value pairs per line:

Therefore, the code for the driver is now given as follows (with changes from the previous post version highlighted):

You can see in the highlighted lines that we are setting up outputs with filenames “resampled” and “posterior”.  Oh, and the name of the file where each stage looks for the previous stage’s output has changed to “…/resampled-r-00000”.  This “<FILENAME>-r-xxxxx” is the standard name format of files from reducer number xxxxx (which again is always going to be 00000 here because we force all particles into a single reducer).

Custom(ish) Output

Now that we can create multiple output files, we want to output something more interesting than just single key/value pairs for each particle.  In fact, this is necessary for outputting the non-resampled posterior, because we also need to output the weight of each particle alongside its state value.  The same would also be true if the state held by each particle was multi-dimensional, rather than scalar as it is here; in that case we’d like to be able to output the complete state.

In fact, the simplest way to do that is to exploit the way that the TextOutputFormat class converts objects to strings in order to write them.  As you might expect, it uses the object’s toString() method.  So, all we have to do to create custom text output for an object is to override the object’s toString() method.  To do this, we add the following code to the Particle object:

Now, it might be argued that this is a bit of a hacky approach and that an object’s toString() method should never be relied upon for anything other than display of the object to humans.  And there are some valid arguments there, but unless you do this, the TextOutputFormat class is limited to a very few types with already-sensible toString() methods.  Still, beware, and use at your own risk.

It is also possible to make your own OutputFormat classes, which can turn custom objects into output of the required form.  This is a somewhat more involved approach, that isn’t covered here, but which ultimately will give you greater control of your custom output.  In that case, you need to make custom RecordWriters and RecordReaders for the objects of interest.

Conclusion

We’ve now seen how to make a Hadoop Reducer output multiple files, and how we can write more custom information into those files.  This article hasn’t explored the use of completely custom output, but it has provided a solution that is adequate for the particle filter we are developing.  This is essential for the final part of the series, in which we adapt this particle filter to do online parameter estimation.  The output from the particle filter at each stage now goes into two files “resampled”, which is just a list of numbers giving the states of the resampled particles on each line, and “posterior”, which gives the state and log-weight of each particle in the pre-resampled collection.