James Murphy

Hadoop Part IV: Online Parameter Estimation

Hadoop Part IV: Online Parameter Estimation

For the final (for now) part of this series, I am going to extend the particle filter to do online parameter estimation using online Expectation-Maximization (EM) to calculate an estimate of the autoregression parameter at each stage of the particle filter.

There are many options for online parameter estimation, including Bayesian methods that attempt to calculate a posterior distribution of the parameter estimate.  Here, we will look at calculating a single maximum likelihood (ML) estimate using all the particles in the current particle collection.  Two main techniques are used to calculate such online ML estimates: (stochastic) gradient ascent, which requires an estimate of the gradient of the log-likelihood to be calculated from the particles, and online Expectation-Maximization (EM), which adapts the offline EM algorithm to sequential use.  Here we will apply the latter method.

Online Expectation Maximization (EM)

Expectation Maximization (EM) although originally designed for offline use (and described in that context in detail elsewhere) can be adapted for online ML parameter estimation.   Although EM can be applied to fewer problems than (stochastic) gradient ascent, usually only those involving distributions in exponential families, it has the advantage that the size of the parameter step at each stage is automatically calculated.  This makes the algorithm easier to configure and less vulnerable to poor step size sequence choices.

Standard offline EM is a two step process.  The first step  (the E-step) involves calculating a lower-bound function \mathcal{L}_t(\alpha) on the log-likelihood \log p(y_{1:t}\mid\alpha), given by

\mathcal{L}_t(\theta) = \int \log p(x_{1:t},y_{1:t}\mid\theta)p(x_{1:t}\mid y_{1:t}, \hat\theta_k) dx_{1:t}


where \hat\theta_k is the current estimate of \theta after the kth iteration of the EM algorithm.

The second step of the EM algorithm (the M-step) is to maximize this lower bound with respect to \theta, giving a new estimate \hat\theta_{k+1}.  The process is then iterated until convergence to a (local) maximum of the log-likelihood.

For the AR(1) system with autoregression parameter \alpha, the so-called “full-data” log-likelihood (with respect to the autoregression parameter \alpha) \log p(x_{1:t},y_{1:t}\mid\alpha) is given by

\log p(x_{1:t},y_{1:t}\mid\alpha) = \frac{\alpha}{\sigma_s^2}\sum_{i=2}^t x_i x_{i-1}-\frac{\alpha^2}{2\sigma^2}\sum_{i=2}^t x_{i-1}^2+C


where C is a constant with respect to \alpha.  This expression be found in terms of two sufficient statistics of the state sequence x_{1:t}, namely S_t^{(1)}=\sum_{i=2}^t x_i x_{i-1} and S_t^{(2)}=\sum_{i=2}^tx_{i-1}^2.  Because these are sums over stage indices i, these statistics can be calculated sequentially, with the latest values of the term within the sum being added at each stage.

In terms of these sufficient statistics (and ignoring constants with respect to $\alpha$), the lower bound function is given by

\mathcal{L}_t(\alpha)=\int\bigg[\frac{\alpha t}{\sigma_s^2}S_t^{(1)}-\frac{\alpha^2 t}{2\sigma_s^2}S_t^{(2)}\bigg]p(x_{1:t}\mid y_{1:t},\alpha_t)dx_{1:t}


which can be expressed in terms of expectations (over the state sequence x_{1:t}) as

\mathcal{L}_t(\alpha)=\frac{\alpha t}{\sigma_s^2}E\big[S_t^{(1)}\mid y_{1:t},\alpha_t\big]-\frac{\alpha^2 t}{2\sigma_s^2}E\big[S_t^{(2)}\mid y_{1:t},\alpha_t\big].


This lower bound is maximized when \frac{\partial \mathcal{L}}{\partial \alpha}=0, which occurs when

\alpha_*=\frac{E\big[S_t^{(1)}\mid y_{1:t},\alpha_t\big]}{E\big[S_t^{(2)}\mid y_{1:t},\alpha_t\big]}.


In a sequential Monte Carlo setting, the expectations of these sufficient statistics and thus the value of \alpha_* can be estimated by taking a weighted average of the sufficient statistics held by each particle, i.e. by taking E\big[S_t^{(1)}\mid y_{1:t},\alpha_t\big] \approx\sum_{j=1}^N w_{t}^j S_{j,t}^{(1)} where S_{j,t}^{(1)} is the value of the sufficient statistic for particle j and w_{t}^j is the weight of particle j. At each stage, the optimum value of \alpha_* calculated above gives the new parameter value \alpha_{t+1}, i.e. \alpha_{t+1}=\alpha_*, completing one iteration of the EM algorithm at each stage.   Unlike with gradient ascent methods, the update for the parameter \alpha is automatically scaled and does not depend on any externally chosen scaling factor.

Implementing Online EM for the MapReduce Particle Filter

In order to implement the online EM parameter estimation procedure outlined in the previous section we need to make the following changes to the particle filter code from the previous post.

  1. Add variables representing the sufficient statistics S_t^{(1)}=\sum_{i=2}^t x_i x_{i-1} and S_t^{(2)}=\sum_{i=2}^tx_{i-1}^2 to the particle class
  2. Add code to allow the Mapper to read in particles including their sufficient statistics
  3. Add code to the Mapper to update these sufficient statistics for each particle
  4. Add code to the Reducer to calculate the expectation of the sufficient statistics and update the parameter estimate
  5. Output the parameter estimate and pass it on to the next stage

Tackling these changes roughly in order, the following changes must be made to the code from the previous article in this series.


The highlighted changes are straightforward.

  • An array (suffstats) is added to the Particle object to hold the sufficient statistics (of which there will be two).
  • The readFields(…) and write(…) methods are updated to account for the sufficient statistics
  • An additional method fromString(…) is added, which parses a Particle object from the type of strings written out by its toString() method – this will be used to read in Particles in the Mapper
  • A duplicate feature is added, which creates a copy of the particle.


There are only two changes in the Mapper:

  • The Particle’s fromString() is used to parse them in from the input file (which, as we shall see in the Reducer, will now be in the form of a set of strings representing Particle objects, rather than simply a single value giving the particle state x.  This is necessary because the sufficient statistics now also need to be transmitted from one stage to the next)
  • Sufficient statistics are updated for each particle.  This is not necessary at the first stage, since both sufficient statistics involve the previous particle value, so start at zero.


There is only one significant change to the reducer:

  • Calculation of the expectation of the sufficient statistics is performed, followed by calculation of a revised parameter estimate.  This parameter value is output to the main output (i.e. the one named “part-r-xxxxx”) by writing to the context, for use in the next stage, with a key “alpha”.  Further parameter estimates with other names could also be written here.
  • Note that the “resampled” output is now a Particle object, rather than a DoubleWritable.


There are a couple of minor changes to the driver:

  • Code is added at each stage to read in the parameter(s) estimated at the previous stage and add them to the job’s configuration.  This uses another helper function (given in the Appendix at the end), which would have to be replaced if not running in LocalJobRunner mode (since it assumes it can get to the local filesystem) – see the first post in this series.
  • The type of the “resampled” output is changed to be Particle (from DoubleWritable) in the previous version.



The state estimation results of running the parameter-estimating Hadoop Map-Reduce particle filter over 100 stages, with 1000 particles are in the figure.  As before, the output of the particle filter is summarized as grey shading with a white line running through the middle of it.  This reflects the mean of the particle collection (and thus the PF’s estimate of E[x], the expected state value at each stage) plus/minus two standard deviations, which gives an idea of the uncertainty in the estimate.  To avoid clutter, the positions of the individual particles are not shown here (although note that, since it is available, we are now generating this output using the weighted, pre-resampling particle collection at each stage).  Since this is a linear Gaussian problem, we can also use a Kalman filter to do this estimation (exactly), and the red lines show the value of E[x] (solid red line) plus/minus two standard deviations (dashed red lines).  The blue line (and stars) show the true value of the hidden value x at each stage.

The parameters of the process here are: alpha = 0.9, sigma_{s} = 1, sigma_{obs} = 1.  The filter gets the latter two of those, but must estimate the \alpha parameter on its own, starting from an initial estimate of 0.5.  This explains the poor early performance of the particle filter compared to the Kalman filter (which is given the correct parameter value and so gives the true answer), since it takes some time for the online EM to correctly estimate the parameter.  The following figure shows the parameter estimate with stage


You can see that the parameter estimate rapidly approaches the true value.  An interesting feature of this estimate is the little bump at around t=60.  Examining the process around then, it is clear that it is undergoing a bigger-than-usual deviation from its mean.  This leads the estimator to increase its estimate of the autoregression parameter, which would explain such behaviour.  In periods of smaller deviations, the estimate declines slightly.

The graphs were created in Matlab.

Each line of the “posterior” and “resampled” output files now consists of four numbers, giving the particle’s state estimate, its log-weight and the two sufficient statistics.  Of course, in the “resampled” file the log-weights are all the same.

Degeneracy of (Simple) Online Parameter Estimates

For the example system that we’ve been looking at in this series (namely the linear Gaussian AR(1) model with noisy observations), the simple version of online EM presented here works quite well, reaching a good estimate of the parameter in relatively few stages.  However, there are some problems with this method, which stem from the nature of the particle filter.  The problem is that the particle filter is good at estimating the current state of the process, but it is quite bad at estimating the state path of the process.  This is because the resampling step means that only ‘good’ particles survive, so, over time, only the offspring of the very best particles from each stage survive.  Taking this to its logical conclusion, a large part of the state path is represented by a single ancestral trajectory.  This leads to degenerate estimates of the sufficient statistics, which can lead methods such as the simple online EM given here to arrive at incorrect parameter estimates.  The problem is more severe as the series becomes longer.  Methods do exist to get around this (see, for example, the 2009 paper “An overview of sequential Monte Carlo methods for parameter estimation in general state-space models” by Kantas, Doucet and Singh, or even… my forthcoming book), although they add a bit more complexity to the algorithm and its exposition.  They also come at the cost of O(N^2) time complexity in the number of particles.  Still, it is worth being aware that the simple EM here is not perfect.


So, that’s it!  An online parameter estimating particle filter running in Hadoop MapReduce.  Obviously the model here is rather trivial, but I hope that this (fairly) clearly shows how such a scheme could be implemented for a more complicated system.  I hope you found some part of this series interesting.  Feel free to get in touch with comments and questions.

Appendix – Ancillary Code

Here is the trivial PFDriver.ReadParameters(…) method, which reads named parameters from a file.  It will only work with Hadoop in LocalJobRunner mode (see Part I), since it assumes that it can access the local filesystem rather than HDFS.