Probabilistic Inference

This case study shows how we can perform inference over probabilistic models with the help of effects.


We require some imports:

import exception

We need some effects to perform probabilistic operations:

type Distribution {
  Gaussian(mean: Double, variance: Double)
  Uniform(lower: Double, upper: Double)
  Beta(mean: Double, sampleSize: Double)
}
type Probability = Double
effect sample(dist: Distribution): Double
effect observe(value: Double, dist: Distribution): Unit
effect random(): Double
effect weight(prob: Probability): Unit
interface Emit[A] {
  def emit(element: A): Unit
}

With the effect sample, we can sample from a given distribution, and with the effect observe we can determine how probable it is to observe a given value under a given distribution (probability density function (PDF)).

Currently, we support models with Gaussian, Uniform or Beta distributions. Thus, we need a way to draw from these distributions and to compute the probability density.

def draw(dist: Distribution): Double / random = {
  dist match {
    case Gaussian(mean, variance) =>
      val u1 = do random();
      val u2 = do random();
      val sigma = variance
      // box-muller transform
      val mag = sigma * sqrt(-2.0 * log(u1))
      val z0 = mag * cos(2.0 * PI * u2)
      return z0 + mean

    case Uniform(lower, upper) =>
      val x = do random()
      return (lower + x * upper)

    case Beta(mean, sampleSize) =>
      val alpha = mean * sampleSize
      return draw(Gaussian(0.5, 1.0 / (4.0 * (2.0 * alpha + 1.0))))
  }
}

For the Beta distribution, we need to approximate the Gamma function

def gamma(z0: Double): Double = {
  var z = z0
  // g is a small integer and p is a list of coeficients dependent on g
  val g = 7.0
  val p = [0.99999999999980993,
    676.5203681218851,
    -1259.1392167224028,
    771.32342877765313,
    -176.61502916214059,
    12.507343278686905,
    -0.13857109526572012,
    9.9843695780195716 * pow(10.0, 0.0 - 6.0),
    1.5056327351493116 * pow(10.0, 0.0 - 7.0)]
  
  // lanczos approximation
  if (z < 0.5) {
    val y = PI / (sin(PI * z) * gamma(1.0 - z))
    return y
  } else {
    with on[OutOfBounds].panic
    z = z - 1.0
    var x = p.get(0)
    var i = 1
    while (i <= 8) {
      x = x + (p.get(i) / (z + i.toDouble - 1.0))
      i = i + 1
    }
    val t = z + g + 0.5
    val y = sqrt(2.0 * PI) * pow(t, (z + 0.5)) * exp(0.0 - t) * x
    return y
  }
}

and then can compute the probability density function at a given value for the aforementioned distributions.

def density(value: Double, dist: Distribution): Double = {
  dist match {
    case Gaussian(mean, variance) =>
      val density = 1.0 / (sqrt(variance) * (sqrt(2.0 * PI))) * exp((0.0 - 1.0 / 2.0) * (square(value - mean) / variance))
      return density

    case Uniform(lower, upper) =>
      if (lower < value && value < upper) { 1.0 / (upper - lower) }
      else { 0.0 }

    case Beta(mean, sampleSize) =>
      val alpha = mean * sampleSize
      val beta = (1.0 - mean) * sampleSize
      val density = (gamma(alpha + beta) / (gamma(alpha) * gamma(beta))) * pow(value, (alpha - 1.0)) * pow((1.0 - value), (beta - 1.0))
      return density
  }
}

Next, we define default handlers for all effects:

def handleSample[R] { program: () => R / sample }: R / random = {
  try { program() }
  with sample { dist => resume(draw(dist)) }
}
def handleObserve[R] { program: () => R / observe }: R / weight = {
  try { program() }
  with observe { (value, dist) => do weight(density(value, dist)); resume(()) }
}
def handleWeight[R] { program: () => R / weight }: (R, Double)  = {
  var current = 1.0
  try { (program(), current) }
  with weight { (prob) =>
    current = current * prob;
    resume(())
  }
}

For generating random numbers, we either use the random function from stdlib

def handleRandom[R] { program: () => R / random }: R =
  try { program() }
  with random { () => resume(random()) }

or a pseudo random number generator using a linear congruential generator:

def linearCongruentialGenerator[R](seed: Int) { prog: => R / random } : R = {
  // parameters from Numerical Recipes (https://en.wikipedia.org/wiki/Linear_congruential_generator)
  val a = 1664525
  val c = 1013904223
  val m = 1073741824
  var x: Int = seed;
  def next() = { x = mod((a * x + c), m); x }
  try { prog() }
  with random {
    resume(next().toDouble / m.toDouble)
  }
}

With the effect emit it is also possible to limit infinite loops. This allows for flexible numbers of steps to be performed with any algorithm that uses the effect emit.

def onEmit[A] { handler: A => Unit } { program: => Any / Emit[A] }: Unit =
  try { program(); () }
  with Emit[A] {
    def emit(element) = { handler(element); resume(()) }
  }
def limit[A](n: Int) { program: => Any / Emit[A] }: Unit / Emit[A] = {
  var steps = n
  try { program(); () }
  with Emit[A] {
    def emit(element) = {
      do emit(element);
      steps = steps - 1;
      if (steps > 0) { resume(()) }
    }
  }
}

Rejection Sampling

The effect weight is the basis of the rejection sampling algorithm is given by the following handler:

def handleRejection[R] { program: () => R / weight }: R / random = {
  try { program() }
  with weight { (prob) =>
    if (do random() < prob) { resume(()) }
    else { handleRejection {program} }
  }
}

Intuitively, invoking do weight(p) in some program prog assigns this path the probability of p such that prog is non-deterministically repeated until it is accepted by the underlying proposal distribution.

region _ {
  with linearCongruentialGenerator(1)
  with handleSample
  with handleRejection
  val N = Gaussian(0.0, 1.0)
  val sample = do sample(N)
  println(show(round(sample, 2)))
  do weight(0.01)
  println("accepted")
}

For easier usage, we define a wrapper to be used for rejection sampling:

def rejectionSampling[R] { program: () => R / { sample, observe } }: Unit / { random, Emit[R] } = {
  def loop(): Unit / Emit[R] = {
    with handleSample
    with handleRejection
    with handleObserve
    do emit(program())
    loop()
  }
  loop()
}

Slice Sampling

The following function is implementing a form of slice sampling — a Markov Chain Monte Carlo (MCMC) algorithm often used in probabilistic programming for sampling from a distribution. This algorithm iteratively samples from a distribution and adjusts weights or probabilities to ensure that the samples align with the target distribution.

def sliceSamplingAlgo[R]() { program: () => R / weight } = {
  val (result, prob) = handleSample { handleWeight { program() }}

  def step(result0: R, prob0: Probability) = handleRejection {
    val (result1, prob1) = handleWeight { program() }
    if (prob1 < prob0) { do weight(prob1 / prob0) }
    (result1, prob1)
  }
  def loop(result: R, prob: Probability): Unit / Emit[R] = {
    do emit(result)
    val (result1, prob1) = step(result, prob)
    loop(result1, prob1)
  }
  loop(result, prob)
}

First, we get an initial sample result with its associated probability prob. step is then used to generate new samples based on the previous result and probability. If the new sample has a lower probability (prob1 < prob0), the sample is down-weighted by adjusting the weight using do weight(prob1 / prob0). This ensures that samples with lower probabilities are less likely to be accepted. Finally, using loop and emit, we emit the sampling results.

Metropolis-Hastings

Tracing

A trace records past samples used by an algorithm. The trace also controls where the algorithm draws samples from in the next iterations. Constructing traces is possible by handling the effect sample, not with the default handler, but with one that draws new samples and records them in a trace.

type Trace = List[Probability]

def handleTracing[R] { program: () => R / sample }: (R, Trace) / sample = {
  var trace = []
  val result = try { program() }
  with sample { (dist) =>
    val d = do sample(dist);
    trace = Cons(d, trace)
    resume(d)
  }
  (result, trace.reverse)
}
def handleReusingTrace[R](trace0: Trace) { program: () => R / sample } = handleObserve {
  var trace = trace0
  try { program() }
  with sample { (dist) =>
    trace match {
      case Nil() => panic("empty trace")
      case Cons(t, ts) =>
        do observe(t, dist)
        trace = ts; resume(t)
    }
  }
}

Proposing samples

How the candidates of this algorithm are proposed can vary between different implementations of the algorithm and thus creating inference of the Metropolis-Hastings algorithm. The Metropolis-Hastings algorithm proposes a new trace based on the old trace, by recursively adding noise to the samples in the old trace and thus creating a new trace.

def propose(trace: Trace): Trace / sample = {
  trace match {
    case Nil() => []
    case Cons(t, ts) =>
      val noise = do sample(Gaussian(0.0, 1.0))
      val proposal = t + noise
      Cons(proposal, propose(ts))
  }
}

Metropolis-Hastings algorithm

The algorithm is implemented with a helper function

def metropolisStep[A](prob0: Probability, trace0: Trace) { program: Trace => (A, Probability) } = {
  val trace1 = propose(trace0)
  val (result1, prob1) = program(trace1)
  if (prob1 < prob0) {
    do weight(prob1 / prob0)
  }
  ((result1, trace1), prob1)
}

metropolisStep is then called in the algorithm implementation

def metropolisHastingsAlgo[A] { program: () => A / { sample, weight } } = {
  val ((result0, trace0), prob0) = handleWeight {
    with handleSample
    with handleTracing
    program()  
  }
  def loop(result: A, trace: Trace, prob: Probability): Unit / Emit[A] = {
    do emit(result)
    val ((result1, trace1), prob1) =
    handleSample {
      with handleRejection
      metropolisStep(prob, trace) { trace =>
        with handleWeight
        with handleReusingTrace(trace)
        program()
      }
    }
    loop(result1, trace1, prob1)
  }
  loop(result0, trace0, prob0)
}

Single-Site Metropolis-Hastings

To perform inference over the Metropolis-Hastings algorithm, and for creating the single-site Metropolis-Hastings algorithm, we just need to alter the implementation of the propose function. In this implementation, the noise is added to only one random sample in the trace and the other samples are reused.

def proposeSingleSite(trace: Trace): Trace / sample = {
  val tsize = toDouble(size(trace))
  val rand = do sample(Uniform(0.0, tsize))
  def putAti(i: Int, p0: List[Double]): List[Double] = {
    p0 match {
      case Nil() => []
      case Cons(p, ps) =>
        if (i == 0) {
          val noise = do sample(Gaussian(0.0, 1.0))
          Cons(p + noise, ps)
        }
        else {
          Cons(p, putAti(i - 1, ps))
        }
    }
  }
  putAti(floor(rand), trace)
}

The single-site Metropolis-Hastings algorithm is implemented like the Metropolis-Hastings algorithm above.

def metropolisStepSingleSite[A](prob0: Probability, trace0: Trace) { program: Trace => (A, Probability) } = {
  val trace1 = proposeSingleSite(trace0)
  val (result1, prob1) = program(trace1)
  if (prob1 < prob0) {
    do weight(prob1 / prob0)
  }
  ((result1, trace1), prob1)
}
def metropolisHastingsSingleSiteAlgo[A] {program: () => A / { sample, weight } } = {
  val ((result0, trace0), prob0) = handleWeight {
    with handleSample
    with handleTracing
    program()
  }
  def loop(result: A, trace: Trace, prob: Probability): Unit / Emit[A] = {
    do emit(result)
    val ((result1, trace1), prob1) = handleSample {
      with handleRejection
      metropolisStepSingleSite(prob, trace) { trace => 
        with handleWeight
        with handleReusingTrace(trace)
        program()
      }
    }
    loop(result1, trace1, prob1)
  }
  loop(result0, trace0, prob0)
}

Wrappers

In order to make it easier to use these algorithms without having to call the various effect handlers, we constructed wrappers for the algorithms implemented previously.

def sliceSampling[R](n: Int) { program: () => R / { sample, observe } }: Unit / { random, Emit[R] } = {
  with handleSample
  with sliceSamplingAlgo[R]
  with handleObserve
  program()
}
def metropolisHastings[R](n: Int) { program: () => R / { sample, observe } }: Unit / { random, Emit[R] } = {
  with metropolisHastingsAlgo[R]
  with handleObserve
  program()
}
def metropolisHastingsSingleSite[R](n: Int) { program: () => R / { sample, observe } }: Unit / { random, Emit[R] } = {
  with metropolisHastingsSingleSiteAlgo[R]
  with handleObserve
  program()
}

Examples

Linear Regression

As a short example of how the effects sample and observe can be used in a model, we construct a linear regression model.

record Point(x: Double, y: Double)

def show(p: Point): String = {
  def rounded(d: Double) = round(d, 3).show
  "Point(" ++ p.x.rounded ++ ", " ++ p.y.rounded ++ ")"
}

def linearRegression(observations: List[Point]) = {
  val m = do sample(Gaussian(0.0, 3.0))
  val c = do sample(Gaussian(0.0, 2.0))
  observations.foreach {
    case Point(x, y) => do observe(y, Gaussian(m * x + c, 1.0))
  }
  return Point(m, c)
}

Remember from earlier that handleObserve uses the weight effect operation. Therefore, the call do observe(y, Gaussian(m * x + c, 1.0)) can be understood as P(Y | C) where Y are the observations and C are the parameters sampled from a Gaussian given by m ~ N(0, 3) and c ~ N(0, 2). If the chosen parameters do not explain the observations well, it is likely it will be rejected and new parameters will be sampled. Conversely, if the model fits well, it is unlikely it will be rejected.

with linearCongruentialGenerator(1)
with onEmit[Point] { s => println(s.show) };

limit[Point](5) {
  with rejectionSampling[Point]

  linearRegression([
    Point(5.0, 5.0),
    Point(1.0, 1.0),
    Point(-2.0, -2.0),
    Point(3.0, 3.0),
    Point(20.0, 20.0),
    Point(5.0, 5.0)
  ])
}

Robot Movements

It is also possible to construct bigger examples on which we can apply these algorithms. In this example, the movements of a robot on a 2D-plane are observed. In the center of this plane, at the coordinates (0, 0), there is a radar station that can measure the distance to the robot at any point in time. We try to predict the state of the robot while taking into account the measurements. The current state of the robot consists of the position and velocity.

record State(x: Double, y: Double, vx: Double, vy: Double)

def show(s: State): String = {
  val State(x, y, vx, vy) = s
  def rounded(d: Double) = round(d, 3).show
  "State(" ++ x.rounded ++ ", " ++ y.rounded ++ ", " ++ vx.rounded ++ ", " ++ vy.rounded ++ ")"
}

In this example, we also define a type alias for Measurement:

type Measurement = Double
type Path = List[State]

For simulating the movement of the robot at a given state, we sample accelerations xa, ya along the x and y axis from a Gaussian distribution. By applying the laws of motion and assuming that the difference between each time step is one, we can derive the update equations given the sampled acceleration for the velocity and the position.

def move(s: State): State / sample =
  s match {
    case State(x0, y0, vx0, vy0) =>
      val xa1 = do sample(Gaussian(0.0, 1.0))
      val ya1 = do sample(Gaussian(0.0, 1.0))
      val (x1, y1) = (x0 + vx0 + xa1, y0 + vy0 + ya1)
      val (vx1, vy1) = (vx0 + xa1, vy0 + ya1)
      return State(x1, y1, vx1, vy1)
  }

After predicting the next state of the robot, we get a distance measurement from the radar station. We then compute the probabilistic distance given the predicted state using the Euclidean distance. Next, we try to reconcile our model with the given measurement by applying the obeserve effect operation. Similar to linear regression example, we reject our prediction if the observation is unlikely given our previously predicted acceleration.

def measure(s: State, m: Measurement): Unit / observe = {
  s match {
    case State(x, y, vx, vy) =>
      val dist = sqrt(square(x) + square(y))
      do observe(m, Gaussian(dist, 0.4))
  }
}

Putting it all together, we first call move for predicting the acceleration and then reconcile it with a given measurement. If the predicted state is unlikely to have produced the measurement, we predict another state. Finally, we return the first state that is not rejected.

def step(s0: State, m1: Measurement): State / { sample, observe } = {
  val s1 = move(s0)
  measure(s1, m1)
  return s1
}

We can now put our model to the test by running the following simple example:

with linearCongruentialGenerator(1)
with onEmit[State] { s => println(s.show) };

limit[State](5) {
  with sliceSampling[State](5)

  val init = State(0.0, 3.0, 2.0, 0.0)
  step(init, 5.0)
}

As a more complex example, we now also probabilisticly augment the distance measurements by applying Gaussian noise to it instead of keeping constant. More importantly, we now predict five different possible paths consisting of five states each.

def show(path: Path): String = {
  "[" ++ path.foldLeft("") { (acc, s) =>
    val comma = acc match {
      case "" => acc
      case _ => acc ++ ", "
    }
    comma ++ s.show
  } ++ "]"
}
with linearCongruentialGenerator(1)
with onEmit[Path] { path => println(path.show) };

limit[Path](5) {
  with sliceSampling[Path](1);

  var nextState = State(0.0, 3.0, 2.0, 0.0)
  var nextdis = 5.0
  var m = 5
  var path = [nextState]
  while (m > 0) {
    nextState = step(nextState, nextdis)
    val noise = do sample(Gaussian(0.0, 1.0))
    nextdis = nextdis + noise
    path = append(path, [nextState])
    m = m - 1
  }
  path
}

Epidemic Spreading

This example simulates a population experiencing an epidemic outbreak with the SIR model. The SIR model divides the population into susceptible S, infected I and recovered R.

record Population(susceptible: Double, infected: Double, recovered: Double)

def show(s: Population): String = {
  val Population(susceptible, infected, recovered) = s
  def rounded(d: Double) = round(d, 3).show
  "Population(" ++ susceptible.rounded ++ ", " ++ infected.rounded ++ ", " ++ recovered.rounded ++ ")"
}

Having defined our population state, we now try to predict the population state at the next time step:

def progression(p: Population): Population / sample = {
  p match {
    case Population(s, i, r) =>
      val noise = do sample(Gaussian(0.0, 0.01))
      val s1 = 0.5 * s - noise
      val i1 = 0.3 * i + 0.5 * s + 0.1 * r + noise
      val r1 =  0.9 * r + 0.7 * i
      return Population(s1, i1, r1)
  }
}

Next, given a population and a positive-rate of COVID-like test, we gauge how likely it is that the positive-rate was observed assuming the test result is distributed according to a Beta distribution. Formally, we compute $\text{Beta}(pr \mid I, 100)$ where $I$ is the number of infected individuals in the predicted population. If it is unlikely, the predicted population will be rejected and another one will be proposed.

def test(p: Population, pr: Double): Unit / observe = {
  p match {
    case Population(s, i, r) =>
      val mean = i
      val sampleSize = 100.0
      do observe(pr, Beta(mean, sampleSize))
  }
}

Like in the previous examples, we combine these two functions into a step function:

// approximate next state of population based on current state
def step(p0: Population, pr1: Double): Population / { sample, observe } = {
  val p1 = progression(p0)
  test(p1, pr1)
  return p1
}

Testing it on a simple example, is similar as we have seen previously with the robot movement predictions:

with linearCongruentialGenerator(1)
with onEmit[Population] { p => println(p.show) };

limit[Population](5) {
  with metropolisHastings[Population](5)

  val init = Population(0.7, 0.2, 0.1)
  step(init, 0.8)
}
def show(path: List[Population]): String = {
  "[" ++ path.foldLeft("") { (acc, s) =>
    val comma = acc match {
      case "" => acc
      case _ => acc ++ ", "
    }
    comma ++ s.show
  } ++ "]"
}

We again can also predict multiple different scenarios where the disease spreads differently throughout a population:

with linearCongruentialGenerator(1)
with onEmit[List[Population]] { ps => println(ps.show) }

limit[List[Population]](1) {
  with metropolisHastings[List[Population]](1)

  var nextPop = Population(0.7, 0.2, 0.1)
  var nextpr = 0.8
  var m = 5
  var path : List[Population] = [nextPop]
  while (m > 0) {
    nextPop = step(nextPop, nextpr)
    val noise = do sample(Gaussian(0.0, 0.01))
    var nextpr1 = nextpr + noise
    if (not(nextpr1 < 0.0 || nextpr1 > 1.0)) { nextpr = nextpr1 }
    path = append(path, [nextPop])
    m = m - 1
  }
  path
}

The other algorithms of this library can be called used similar as shown in the previous examples.