Async

This example was proposed and implemented by Jack Firth. It is a basic implementation of an asynchronous IO system that uses a Suspend effect for cooperative multitasking.

type Condition = Int

effect Suspend {
  def newCondition(): Condition
  def wait(condition: Condition): Unit
  def signal(condition: Condition): Unit
}

type ThreadStatus[T] {
  Uninitialized();
  Finished(result: T);
  Waiting(condition: Condition);
  Signalling(condition: Condition)
}

def combineAsync[A, B, C] {f: => A / Suspend} {g: => B / Suspend} {combiner: (A, B) => C / Suspend}
    : C / Suspend = {
 var firstComputation = fun() { Uninitialized() }
  firstComputation = fun() {
    try {
      val result = f()
      firstComputation = fun() { Finished(result) }
      Finished(result)
    } with Suspend {
      def newCondition() = resume(do newCondition())
      def wait(condition) = {
        firstComputation = fun() { resume(()) }
        Waiting(condition)
      }
      def signal(condition) = {
        firstComputation = fun() { resume(()) }
        Signalling(condition)
      }
    }
  }
  var secondComputation = fun() { Uninitialized() }
  secondComputation = fun() {
    try {
      val result = g()
      secondComputation = fun() { Finished(result) }
      Finished(result)
    } with Suspend {
      def newCondition() = resume(do newCondition())
      def wait(condition) = {
        secondComputation = fun() { resume(()) }
        Waiting(condition)
      }
      def signal(condition) = {
        secondComputation = fun() { resume(()) }
        Signalling(condition)
      }
    }
  }
  def loop(firstStatus: ThreadStatus[A], secondStatus: ThreadStatus[B]): C / Suspend = {
    (firstStatus, secondStatus) match {
      // These two cases should be impossible because we already assigned these variables.
      case (Uninitialized(), _) => loop(firstComputation(), secondStatus)
      case (_, Uninitialized()) => loop(firstStatus, secondComputation())

      case (Finished(firstResult), Finished(secondResult)) => combiner(firstResult, secondResult)
      case (Finished(_), Waiting(condition)) =>
        do wait(condition)
        loop(firstStatus, secondComputation())
      case (Finished(_), Signalling(condition)) =>
        do signal(condition)
        loop(firstStatus, secondComputation())
      case (Waiting(condition), Finished(_)) =>
        do wait(condition)
        loop(firstComputation(), secondStatus)
      case (Signalling(condition), Finished(_)) =>
        do signal(condition)
        loop(firstComputation(), secondStatus)
      case (Waiting(firstCondition), Waiting(secondCondition)) =>
        do wait(firstCondition)
        do wait(secondCondition)
        loop(firstComputation(), secondComputation())
      case (Signalling(firstCondition), Signalling(secondCondition)) =>
        do signal(firstCondition)
        do signal(secondCondition)
        loop(firstComputation(), secondComputation())
      case (Waiting(waitCondition), Signalling(signalCondition)) =>
        do signal(signalCondition)
        if (waitCondition != signalCondition) {
          do wait(waitCondition)
        }
        loop(firstComputation(), secondComputation())
      case (Signalling(signalCondition), Waiting(waitCondition)) =>
        do signal(signalCondition)
        if (waitCondition != signalCondition) {
          do wait(waitCondition)
        }
        loop(firstComputation(), secondComputation())
    }
  }
  loop(firstComputation(), secondComputation())
}

type ExecutionResult[A] {
  Success(value: A);
  Deadlock(condition: Condition)
}

def execute[A] { f: => A / Suspend }: ExecutionResult[A] = {
  var nextCondition = 0
  try {
    Success(f())
  } with Suspend {
    def newCondition() = {
      val c = nextCondition
      nextCondition = c + 1
      resume(c)
    }
    def wait(condition) = {
      Deadlock(condition)
    }
    def signal(_) = resume(())
  }
}

def main(): ExecutionResult[Unit] = {
  execute {
    val condition = do newCondition()
    def firstThread(): Unit / Suspend = {
      println("First thread sending signal")
      do signal(condition)
      do wait(condition)
      println("First thread received signal")
    }
    def secondThread(): Unit / Suspend = {
      do wait(condition)
      println("Second thread received signal")
      println("Second thread sending signal")
      do signal(condition)
    }
    // These both print out messages in the same order:
    // - First thread sending signal
    // - Second thread received signal
    // - Second thread sending signal
    // - First thread received signal
    println("Starting first thread first")
    combineAsync { firstThread } { secondThread } { (a, b) => () }
    println("Starting second thread first")
    combineAsync { secondThread } { firstThread } { (a, b) => () }
  }
}
main()