Connecting a push stream and a pull stream
In
Effects, Capabilities, and Boxes
we make it possible (among other things) to store computations in mutable
references. Here we store a function in the mutable reference coroutine that
when called will run iterator until the next yield and then update itself with
the resumption. The innocent-looking line resume(coroutine()) reads the
current function from coroutine, unboxes it, calls it, and resumes with the
result. If you hover over coroutine you will see that the value stored there
is of type () => Option[A] at {iterator, this}. The function closes over
iterator and the mutable reference itself, which lives in this.
type Option[A] {
Present(value: A);
Absent()
}
interface Iterate[A] {
def yield(element: A): Unit
}
interface Reduce[A] {
def receive(): Option[A]
}
def inRange(start: Int, end: Int): Unit / Iterate[Int] = {
do yield(start);
if (start != end) inRange(start + 1, end);
}
def intoFold[A, S](initState: S) { f: (S, A) => S } : S / Reduce[A] =
do receive[A]() match {
case Present(a) => intoFold(f(initState, a)) { (s, a) => f(s, a) }
case Absent() => initState
}
def intoSum(): Int / Reduce[Int] = intoFold(0) { (s, a) => s + a }
def reduce[A, B]() { iterator: () => Unit / Iterate[A] } { reducer: () => B / Reduce[A] } = {
var coroutine = fun() { Absent() };
coroutine = fun() {
try {
iterator();
Absent()
} with Iterate[A] {
def yield(e) = {
coroutine = fun() { resume(()) };
Present(e)
}
}
};
try {
reducer()
} with Reduce[A] {
def receive() = {
resume(coroutine())
}
}
}
def run() = {
reduce[Int,Int]() { inRange(0, 5) } { intoSum() }
}
run()