In this article, I’m going to go about implementing a basic particle filter in Hadoop MapReduce. This is really just a personal interest project for me to get started learning Hadoop based on an algorithm that I am familiar with and suits MapReduce (to some extent), but this might have wider applications in some communities where heavyweight particle filters are in use (e.g. in data assimilation for climate and weather modelling). If you are interested in more details, please get in touch.

**Particle Filters**

The particle filter (PF) is a Bayesian sequential Monte Carlo method that takes a stream of observations (one at each time period) and uses them to generate a weighted collection of samples (particles) that approximately represent the posterior state distribution of a state space model after seeing the current observation. This approximate posterior is then used as the basis for the next step of the filter.

In the simplest ‘bootstrap’ version, the particles are updated according to the transition model of the state space model, i.e. by taking each particle x and putting it through the state transition function f(x) for the next period. This gives the state space model’s prediction of the state at the next observation time, creating an approximate prior distribution for the state at that time as a collection of particles (step 1 in the algorithm below). This is then updated into the posterior distribution via re-weighting the particles according to the next observation, using the rules of importance sampling (step 2). In this basic bootstrap form of the PF, the predictive prior is used as the importance distribution, so we hope that that is a good approximation to the posterior (if not, the variance of the weights will be too high, which in practice means that almost all the weight will concentrate on the least-bad particle). In cases where the observation is very informative and the state space model does not make good predictions, the predictive distribution might not resemble the posterior distribution very much, which will result in high weight variances. Such cases require more more sophisticated proposal strategies (such as using extended or unscented Kalman filters for making proposals adapted to the observation), or better models. We don’t worry about those cases here, however.

The basic bootstrap particle filter algorithm (in pseudo-code) is therefore:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
Set stage counter t := 1 while (observations available) y = next observation for i = 1 to N // (1) propose particles if( first observation ) Sample X[i,t] from prior p(x0 | params) else Sample X[i,t] from transition density p(xt | X[i,t-1], params) end if // (2) weight particles (unnormalized) w_unnorm[i,t] = p(y | X[i,t], params) end for // (3) normalize Weights for i = 1 to N W[i,t] = w_unnorm[i,t] / sum(w_unnorm[i,t]) end for // (*) Posterior approximation p(x_t | y_{1:t}, alpha) -- see text below // (4) Resample for i = 1 to N Draw a[i]=j from multinomial distribution according to weights W[j,t] Set X_new[i,1:t] = X[a[i],1:t] end for Set particle collection to resampled collection: X = X_new Update stage counter t = t+1 end while |

The posterior approximation at the point (*) is given by where is a Dirac point mass at .

If you want to know more about particle filters, there are lots of good tutorials and review papers. A good place to start is probably the 2007 review, “An overview of existing methods and recent advances in sequential Monte Carlo” by Cappé, Godsill and Moulines. Or, of course, you can wait a few months and check out my forthcoming book…

**MapReduce**

MapReduce describes a two-step programming paradigm which can be used for processing data sets. Given a dataset {a,b,c,d,…}, the “map” step applies a function f to each of the data set elements, giving a new dataset {f(a), f(b), f(c), f(d),…}. Because the function is applied separately to each element of the data set, this operation lends itself to parallel processing, because there is no interaction during the application of f to each member of the dataset. Furthermore, the data set can easily be broken up into subsets and mapping done on each of these separately (useful in the case of large datasets being processed on multiple computing nodes).

The “reduce” stage of a MapReduce program takes the processed data set {f(a), f(b), f(c), f(d),…} and applies some sort of aggregation operation to it, yielding some useful information. For example, the reduce operation might be to take the average of the map function values by summing up over them and dividing by the number of elements in the dataset. The reduce operation *does *require communication, because it requires the mapped data be gathered from each of the nodes on which it is processed. However, the nature of this communication is simple and only occurs at a single well-defined point in the program execution.

MapReduce might seem like a restrictive paradigm, and it is – in fact, that is its point. It is designed to simplify the interaction model between computing nodes in multi-node data processing systems so that the parallelization can be effectively exploited. This simple model is what allows Hadoop to automatically deal with distribution and scaling behind the scenes, as well as allowing it to be robust to node failure and many other things. On the other hand, it is often possible to achieve the required results, especially if several map-reduce operations are chained one after another. In the following sections, we will look at how the particle filter can be implemented as a MapReduce program, allowing it to be distributed using Hadoop.

**MapReduce in Hadoop**

MapReduce programs in Hadoop consist of three Java classes: A mapper class, which extends the TODO class and implements the map part of the program; a reducer class, which extends the TODO class and implements the reduce part of the program; and a driver class, which tells the system how everything hangs together.

Hadoop also offers the possibility of local combiners, which are like reduce steps that only act on the data held on each local node. These can be useful to reduce communication overhead because they can reduce the output from a node into a single value or small set of values. For example, if the reducer was going to calculate the sum of all elements in the mapped data set, local combiners could calculate the sum for all mapped data elements on each node, allowing only the per-node sums to be transmitted to the reducer(s).

**A MapReduce Particle Filter**

The particle filter fits relatively well into the MapReduce paradigm, because work on each particle is independent during the proposal and weighting stages, which, if a complex proposal mechanism and system model are used, can make up the bulk of the computation. These stages fit naturally into the “map” part of the program.

Normalization of the weights requires knowledge of the unnormalized weights from each particle, and so has to form part of a “reduce” step. Here, we also include resampling as part of the reduce step, since it requires knowledge of the normalized weights. However, resampling might be possible as part of a subsequent second map stage, as discussed at the end of this post. For simplicity, we deal with it in the reducer.

The following three sections look at the mapper, reducer and driver in turn for a basic particle filter for the very simple linear Gaussian noisy lag 1 autoregression (AR(1)) system of the form:

with and , where is a Gaussian distribution of mean a and variance b. The problem that we are trying to solve is to determine the unknown (latent) state variables given a series of noisy observations . In this case, the system is linear Gaussian, so this problem could actually be (better) solved using a Kalman filter. That’s okay – we’re just using this as an example problem – it would be very easy to change this to something more interesting – and anyway, it’s kind of handy to have a reference method to compare the results to.

Note that, as will all practical implementations of particle filters, all weights are stored as the log-weight to avoid numerical underflow when they are unnormalized.

**Mapper**

The Java code for the Mapper looks like this:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
import java.io.IOException; import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; // Mapper class for the particle filter // The generic types inside the < > here give, respectively: // the incoming key type (LongWriteable), the incoming value type (Text), the outgoing key type (Text), the outgoing value type (Particle) public class PropWeightMapper extends Mapper<LongWritable, Text, Text, Particle> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // Read the parameters from the configuration Configuration conf = context.getConfiguration(); boolean isprior = conf.getBoolean("isprior", false); // is this the first stage? double y = conf.getDouble("observation", 0.0); // the current observation double alpha = conf.getDouble("alpha", 0.0); // the auto-regression parameter alpha double sigma_obs = conf.getDouble("sigma_obs", 0.0); // observation noise std. double sigma_st = conf.getDouble("sigma_st", 0.0); // state noise std. // The value that we get in is the previous state of this particle // This could easily be made multivariate by having multiple items on a line and tokenizing them double xprev = Double.parseDouble(value.toString()); // Make a random number generator // (interesting question about how to correctly start these in each mapper // to ensure the actual output is random across all mappers) Random r = new Random(); Particle p = new Particle(); // Sample x (current state sample) // Here we are using a bootstrap filter, so just sampling from the prior (transition density) // Much better sampling schemes are possible that adapt the samples to the observation // e.g. using extended / unscented Kalman filters // In fact the value of an implementation like this is when the propose/weight stages are computationally demanding // since then they are nicely parallelized if(isprior) { // Sample x from the stage 1 prior (here N(0,1), so pretty easy to sample) p.x = r.nextGaussian(); } else { // Sample x from the transition density x ~ N(alpha*xprev, sigma_st^2) p.x = alpha * xprev + r.nextGaussian()*sigma_st; } // Weight the sample // This is done in logs to avoid numerical underflow p.logw = lognormpdf(y,p.x,sigma_obs); // Write the weight (logw) and sample position (x) out to the context for use in the next part of the process // All the particles are identical, so give them all the same key // This will send them all to the same reducer to add up the total weight // There is a degree of badness in this, in that it requires a single reducer context.write(new Text("particle"), p); } // Returns the log-pdf of the Gaussian distribution with mean mu and standard deviation sigma, at point x private double lognormpdf(double x, double mu, double sigma) { return -Math.log(sigma*Math.sqrt(2.0*Math.PI)) - Math.pow(x-mu,2.0)/(2*Math.pow(sigma, 2.0)); } } |

Mappers like this one must extend the (abstract) base class Mapper, overriding the map(…) method.

Input to the mapper takes the form of a key-value pair. The type of the key is given by the first generic type of the class (i.e. the first class name inside the anglebrackets), here LongWritable and the type of the incoming values is given by the second generic type (here Text).

Output is similarly a key-value pair, with key type specified by the third generic type (here Text) and value type specified by the fourth generic type (here Particle, see below). As shown here a custom type (Particle) can be used as output, as long as it implements the Writable interface.

Points to note about this mapper are:

- Its input is a set of
**evenly-weighted**particles from the previous stage (we assume resampling has happened at the previous stage, yielding a collection of evenly-weighted particles as its output). These are described by their (scalar) positions. - These scalar positions (xprev) are encoded in Text objects, which are the values of the incoming data; these Text objects are parsed to retrieve the incoming particle’s state value.
- The keys of the incoming data are ignored in this mapper, since all particles are treated the same.
- It has a special sampling procedure for the first particle filter stage, when it samples from the state 1 prior, rather than the transition function.
- Parameters (including the “isPrior” indicator, which tells it whether this is the first stage or not, are passed as part of the configuration, which can contain named properties (also see the Driver section to see how these are set). The configuration is passed to all mappers as part of the Context object they receive as an argument (called context here), so is a good way to transmit (smallish) shared parameter to them all.
- The output is written by writing a key-value pair of the correct types to the Context object that is passed to the function.
- In this mapper, all particles are given the same key, the Text object “particle”. This key is used to determine what happens to the mapper output at the reduce stage. Values with the same key get sent to the same reducer. Here, therefore, all values will be sent to the same reducer. This is what we want, since we need to add up all the unnormalized particle weights so we can normalize them. In many applications, on the other hand, you want to find things out about several categories of outputs, so several different keys might be used. This is actually more desirable, because this allows multiple reducers to be run on different nodes, increasing parallelization benefit.

The class Particle used here is a simple custom particle class that is defined as follows:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Writable; public class Particle implements Writable{ public double x; // position public double logw; // log weight (beware, might be normalized or unnormalized) public Particle() { // Initialize the values to some arbitrary ones x = 0.0; logw = Double.NEGATIVE_INFINITY; } @Override public void readFields(DataInput in) throws IOException { x = in.readDouble(); logw = in.readDouble(); } @Override public void write(DataOutput out) throws IOException { out.writeDouble(x); out.writeDouble(logw); } } |

Note that Particle implements the Writable interface, which means that it has readFields(…) and write(…) methods. These allow Hadoop to read and write the Particle object internally, so that it can be passed around, e.g. from Mappers to Reducers. What they do not do is allow the particle to be written out to a file. This will be discussed in the next article in this series.

**Reducer**

The code for the reducer looks like this:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
import java.io.IOException; import java.util.ArrayList; import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; public class PFReducer extends Reducer<Text, Particle, Text, DoubleWritable> { @Override public void reduce(Text key, Iterable<Particle> values, Context context) throws IOException, InterruptedException { // Find the maximum particle weight // subtract this from all weights, equivalent to dividing by constant // this helps with numerical stability double maxlogw = Double.NEGATIVE_INFINITY; ArrayList<Particle> particles = new ArrayList<Particle>(); for(Particle p : values) { if(p.logw>maxlogw) maxlogw = p.logw; // Copy the particle data into a new particle Particle px = new Particle(); px.logw = p.logw; px.x = p.x; particles.add(px); // store the particles from the iterable values } // Calculate the unnormalized weight sum of the particles double wsum=0.0; for(Particle p : particles) { wsum += Math.exp(p.logw-maxlogw); // subtract max weight to help with numerical stability } // Calculate the normalized weights double[] cumw = new double[particles.size()]; for(int i=0; i<particles.size();i++) { Particle p = particles.get(i); // normalize the weights (remembering that we also need to take off the factor of maxlogw) p.logw = p.logw - Math.log(wsum) - maxlogw; if(i>=1) cumw[i]=cumw[i-1]+Math.exp(p.logw); else cumw[i]=Math.exp(p.logw); } // Now do resampling (stratified resampling here - faster than multinomial) Random r = new Random(); int curix = 0; for(int i=0;i<particles.size();i++) { double ww = (i + r.nextDouble())/(double)particles.size(); while(cumw[curix]<ww) curix++; double ancestorx = particles.get(curix).x; // write out the state value of the particle for each particle // (this will be the output of the reducer) context.write(new Text(""), new DoubleWritable(ancestorx)); } } } |

Reducers must extend the (abstract) base class Reducer, and override the reduce method. As with the Mapper, the generic types taken by the class (inside the angle brackets in its definition) indicate the key and value types of the reducer’s input and output. The input key are Text objects and their values are Particles – these should match the output of the Mapper that is creating them. This reducer is going to output key-value pairs with keys of type Text and values of type DoubleWritable.

Unlike the Mapper, the input to the Reducer contains multiple key-value pairs, representing the mapper output of all the objects from the Mapper’s output that have been sent to it.

Things to note about this reducer:

- Weight normalization is performed by first subtracting the maximum log weight of any particle from all particles (equivalent to dividing by the maximum weight), and then normalizing the resulting weights. This ensures that at least one value will have a non-zero weight once the unnormalized log-weights are exponentiated, avoiding the problem of numerical underflow if all unnormalized weights are very small.
- The resampling scheme is a stratified resampling scheme, which is slightly quicker to run than a multinomial resampling scheme, because it avoids having to search through the set of particle weights each time a draw is made. This might be useful for large numbers of particles.
- The output is a series of DoubleWritables, which are output by writing them to the context (as in the Mapper). Here, these DoubleWritables indicate the state value of the particles in the particle collection
*after*resampling. Because of the resampling type used here, they are all evenly weighted and so we don’t need to output their weights.

**Driver**

The final part of Hadoop Map-Reduce programs is the Driver, which tells Hadoop how everything hangs together, and provides an entrypoint for the program. The code for the particle filter driver (PFDriver) looks like this:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.io.BufferedWriter; import java.io.File; import java.io.FileOutputStream; import java.io.FileReader; import java.io.BufferedReader; import java.io.OutputStreamWriter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class PFDriver { /*** * * @param args args[0] = filename for observations, args[1] = number of particles * @throws Exception */ public static void main(String[] args) throws Exception { // Read some observation data from a file // Obviously in a real system we'd receive this data one observation at a time in each loop // but this is just for simplicity ArrayList<Double> Observations = ReadObservations(args[0]); // Set the number of particles int NumParticles = Integer.parseInt(args[1]); // To set the number of particles to n, we have to create an input file with n lines of input // This is a bit of a hack... WriteStageZeroInput(NumParticles); // Set the configuration of the system parameters Configuration conf = new Configuration(); conf.setBoolean("isprior", true); conf.setDouble("alpha",0.9); conf.setDouble("sigma_st", 0.5); conf.setDouble("sigma_obs", 0.2); // Each stage of the particle filter (i.e. after each observation) is going // to be done as a separate map-reduce job, so we are going to iterate over observation for(int i=0;i<Observations.size();i++) { // Configure so that the first stage samples x from the prior if(i==0) conf.setBoolean("isprior", true); else conf.setBoolean("isprior",false); // Set the current observation // This piece of information is required by all mappers so a parameter makes sense for it conf.setDouble("observation", Observations.get(i)); // Set up a new job for the current stage Job job = new Job(conf, "pf"); job.setJarByClass(PFDriver.class); job.setMapperClass(PropWeightMapper.class); job.setReducerClass(PFReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Particle.class); if(i==0) FileInputFormat.addInputPath(job, new Path("stage0.txt")); else // The input of the next stage is the -resampled- output from the previous stage FileInputFormat.addInputPath(job, new Path("output//stage"+i+"//part-r-00000")); FileOutputFormat.setOutputPath(job, new Path("output//stage"+(i+1))); job.waitForCompletion(true); // block until job completes } } ... (see appendix) } |

Things to note about this driver are:

- The location of the observations and the number of particles to use are set by the command line arguments args[0] and args[1], respectively.
- ReadObservations(…) is a helper function, which reads the observations from the filesystem (Code is given at the end of the article for this method). For large datasets, this data would likely be read one observation at a time from the filesystem, but for simplicity here, it is all read in in one go at the start.
- WriteStageZeroInput(…) is a helper function which generates input for the first stage. Because the number of particles is set by the number of Mappers that are spun up in each stage, which is in turn determined by the number of input datapoints they get, it seemed like the easiest way to set the number of particles at the start was to generate some dummy input (which is discarded) with the correct number of datapoints. (Code is given at the end of the article for this method).
- Parameters (used by the Mapper) are set by setting named values in a configuration, which is part of the Context object that will be passed to each Mapper and Reducer.
- The driver then loops over each observation, setting up and running a Map-Reduce job at each iteration of the for loop, and making sure the input of one stage is the output of the previous stage.
- For each stage (i.e. after each observation), the driver sets up parameters specific to that stage (whether this is the first stage or not) and the current observation.
- A new job is set up for the stage, by setting the configuration, driver, mapper and reducer, along with the type of the output keys and values.
- Finally, the input and output for the job are setup. Here FileInputFormat and FileOuputFormat are used, which are a pair of standard InputFormat and OutputFormat classes. FileInputFormat will read (text) files in line by line, sending each line off to a Mapper. FileOutputFormat will write text files (called “part-r-xxxxx”, where xxxxx is the number of the reducer whose output the file contains) with each new key-value pair on a new line. These are simple default Inputs and Outputs, others are available, and, for maximum control over input and output, custom record readers and writers can also be defined. The creation of multiple outputs is covered in the next article.
- The name of the file “part-r-0000” can be used here because, due to the structure of the reducer’s output (see above) we know there will only be one reducer, so this file will be the one that contains the resampled particles.
- The very last line tells the system to wait for the job to complete before continuing, which is necessary because we want the data from one stage available before we start the next.

**Results**

The output from each stage is a folder containing a part-r-0000 file. This file contains the state value of each post-resampling particle on a line. We therefore expect it to contain duplicate entries corresponding to more heavily-weighted particles.

The results of running the Hadoop Map-Reduce particle filter over 100 stages, with 1000 particles are in the figure. There is a fair bit going on in this figure, but the output of the particle filter is shown as grey shading with a white line running through the middle of it. This reflects the mean of the particle collection (and thus the PF’s estimate of E[x], the expected state value at each stage) plus/minus two standard deviations, which gives an idea of the uncertainty in the estimate. Since this is a linear Gaussian problem, we can also use a Kalman filter to do this estimation (exactly), and the red lines show the value of E[x] (solid red line) plus/minus two standard deviations (dashed red lines). As you can see, these match up closely with the corresponding estimates from the Kalman filter. The blue line (and stars) show the true value of the hidden value x at each stage. As you can see, this is quite closely approximated by the estimate of E[x] and, in all cases, falls within the 2 std band. Finally, the black dust at each stage shows the actual locations of the (evenly weighted) particles, with each being represented by a single dot. The parameters of the process here are: , , . We give the filter the correct values of these parameters.

So, there you have it. A functional Hadoop Map-Reduce particle filter. In subsequent articles we will look at how to extend it to output multiple files and how to get it to do online parameter estimation for the autoregression parameter, using online EM.

One gotcha when running this program: it won’t run if the output subdirectory already exists, so you have to delete that each time you want to run it. Not too much of a problem but the error message is slightly cryptic.

**Refinements**

In the algorithm outlined here, the reducer is doing a lot of work. Furthermore, there can be only one reducer, so we are really forcing all this computation to take place on a single node. If we have a situation where we want to deal with a million or a billion particles, this might be overwhelming.

There’s no getting around the fact that at some point, a sum has to be calculated over all particles in order to normalize the particle weights. So that is always going to be a potential bottleneck if you have a lot of particles. Although, only the weights are required to be transmitted for this, and local combiners could sum up the weight of all the particles being dealt with on a local node, meaning only one value per node ever needs to be transmitted. So, even for the large-scale case with, say, a million particles on 1000 nodes this shouldn’t be too onerous.

Resampling is an interesting question. If a single reducer is used as in the above model, this could create a bottleneck when moving to, say, a million particles, since one node would have to do all the sampling for a million particles. It would be much nicer to split this up amongst the nodes. But is splitting up the resampling task possible? Well, I think so.

The purpose of resampling is to create an equivalent delta-mass based posterior representation but with lower weight variance than the one you start with. This avoids the (inevitable) problem of all the mass concentrating on a single particle after a few stages if resampling is not used. What we would like to do is to break up this resampling task into a set of smaller tasks that can be distributed across the nodes.

One possible scheme (I think, although I have not verified this) is to break up the particles into subsets, and to locally resample each subset. Each subset i will produce offspring particles, with (equal) new weights set so as to give a total new weight for the subset equal to its incoming weight. The split into subsets can be arbitrary, but take into account the number of computational nodes available, allowing resampling to occupy all the nodes. The number of output particles for subset i should be based on a target number of overall particles N, and then set as , where is the weight of subset i. This will result in each subset generating an extra particle, resulting in N+M-1 output particles being generated. This will result in N+M-1 particles being generated. This resampling method will not give quite such low weight variance as standard resampling, but can be parallelized into an arbitrary number of subsets M<=N.

Within a Map-Reduce context, implementing this scheme will require a second Map-Reduce stage. The first Mapper will be as here, but now the second Reducer will simply calculate the weight sum. The second Mapper stage will then normalize the particle weights and output particles with normalized weights that are randomly assigned to different groups (by e.g. setting their keys to one of M values). The second Reducers (of which M will be created) will then perform resampling for each of the M subsets and output their (locally) resampled particles.

**Conclusion**

This has been a bit of a mammoth article, but we now have a working, if basic, Hadoop Map-Reduce particle filter! In the next article we will explore how this can be extended by allowing it to create multiple output files.

**Appendix – Ancillary Code**

The PFDriver.ReadObservations(…) and PFDriver.WriteStageZeroInput(…) ancillary functions are given below. They are static methods of the PFDriver class. Note that these work with the local filesystem, not the HDFS and so will only work with Hadoop in its LocalJobRunner mode (see previous article). They are intended merely as ancillary functions, so don’t make any claim to robustness or quality.

First, ReadObservations(…), which opens a text file and reads each line as an observation:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
// Reads data from a file with one entry per line // filename is the filename of the file private static ArrayList<Double> ReadObservations(String filename) { ArrayList<Double> list = new ArrayList<Double>(); // Open the data file for reading File file = new File(filename); BufferedReader reader = null; try{ reader = new BufferedReader(new FileReader(file)); String text = null; while((text=reader.readLine()) != null) { list.add(Double.parseDouble(text)); } } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { try { if(reader!=null) reader.close(); } catch(IOException e) { e.printStackTrace(); } } return list; } |

And then PFDriver.WriteStageZeroInput(…), which opens a (specific) file and writes n lines of data to start off the particle filter having n particles:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
// Writes a text file with n lines private static void WriteStageZeroInput(int n) { BufferedWriter writer = null; try { writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("stage0.txt"), "utf-8")); for(int i=0;i<n;i++) { writer.write("1.0"); writer.newLine(); } } catch(IOException e) { e.printStackTrace(); } finally { try { writer.close(); } catch(Exception e) { e.printStackTrace(); } } } |