Connecting a push stream and a pull stream

In Effects, Capabilities, and Boxes we make it possible (among other things) to store computations in mutable references. Here we store a function in the mutable reference coroutine that when called will run iterator until the next yield and then update itself with the resumption. The innocent-looking line resume(coroutine()) reads the current function from coroutine, unboxes it, calls it, and resumes with the result. If you hover over coroutine you will see that the value stored there is of type () => Option[A] at {iterator, this}. The function closes over iterator and the mutable reference itself, which lives in this.

type Option[A] {
  Present(value: A);
  Absent()
}

effect Iterate[A] {
  def yield(element: A): Unit
}

effect Reduce[A] {
  def receive(): Option[A]
}

def inRange(start: Int, end: Int): Unit / Iterate[Int] = {
  do yield(start);
  if (start != end) inRange(start + 1, end);
}

def intoFold[A, S](initState: S) { f: (S, A) => S } : S / Reduce[A] =
  do receive[A]() match {
    case Present(a) => intoFold(f(initState, a)) { (s, a) => f(s, a) }
    case Absent() => initState
  }

def intoSum(): Int / Reduce[Int] = intoFold(0) { (s, a) => s + a }

def reduce[A, B]() { iterator: () => Unit / Iterate[A] } { reducer: () => B / Reduce[A] } = {
  var coroutine = fun() { Absent() };
  coroutine = fun() {
    try {
      iterator();
      Absent()
    } with Iterate[A] {
      def yield(e) = {
        coroutine = fun() { resume(()) };
        Present(e)
      }
    }
  };
  try {
    reducer()
  } with Reduce[A] {
    def receive() = {
      resume(coroutine())
    }
  }
}

def main() = {
  reduce[Int,Int]() { inRange(0, 5) } { intoSum() }
}
main()