Connecting a push stream and a pull stream

In Effects, Capabilities, and Boxes we make it possible (among other things) to store computations in mutable references. Here we store a function in the mutable reference coroutine that when called will run iterator until the next yield and then update itself with the resumption. The innocent-looking line resume(coroutine()) reads the current function from coroutine, unboxes it, calls it, and resumes with the result. If you hover over coroutine you will see that the value stored there is of type () => Option[A] at {iterator, this}. The function closes over iterator and the mutable reference itself, which lives in this.

type Option[A] {
  Present(value: A);
  Absent()
}

interface Iterate[A] {
  def yield(element: A): Unit
}

interface Reduce[A] {
  def receive(): Option[A]
}

def inRange(start: Int, end: Int): Unit / Iterate[Int] = {
  do yield(start);
  if (start != end) inRange(start + 1, end);
}

def intoFold[A, S](initState: S) { f: (S, A) => S } : S / Reduce[A] =
  do receive[A]() match {
    case Present(a) => intoFold(f(initState, a)) { (s, a) => f(s, a) }
    case Absent() => initState
  }

def intoSum(): Int / Reduce[Int] = intoFold(0) { (s, a) => s + a }

def reduce[A, B]() { iterator: () => Unit / Iterate[A] } { reducer: () => B / Reduce[A] } = {
  var coroutine = fun() { Absent() };
  coroutine = fun() {
    try {
      iterator();
      Absent()
    } with Iterate[A] {
      def yield(e) = {
        coroutine = fun() { resume(()) };
        Present(e)
      }
    }
  };
  try {
    reducer()
  } with Reduce[A] {
    def receive() = {
      resume(coroutine())
    }
  }
}

def run() = {
  reduce[Int,Int]() { inRange(0, 5) } { intoSum() }
}
run()