Saturday, May 5, 2018

Asynchronous Programming with Futures and Promises

In multi-threaded programs synchronization is a mechanism commonly used to avoid various issues faced in concurrent programming,like race conditions, ordering.But this mechanism has its own pitfalls, like deadlocks, starving threads etc.
Asynchronous programming attempts to deal with issues. This is a programming style in which tasks are executed independently of the main thread flow. Futures and Promises are the main abstractions in asynchronous programming.

Futures

In a concurrent program, multiple computations are executed on entities called threads. At any given point of time, a thread may be stuck as it is waiting for a condition to be fulfilled. Like, a thread is executing a method which makes a call over the network to fetch some data and construct a string from this data and return it. 
def getDataFromServer(): String
But call over network may be very time consuming, and hence thread has to wait for method to return.Waiting threads are not a good idea when resources are limited.So a better way is to block the thread.
But blocking the thread has it's own side effects. Scala provides a solution to this problem.We change the return value of the method from string to something which can be returned immediately.This return value is called future.A future is a placeholder, that is a memory location for the value. It doesn't need to contain a value when it is created, and a value can be placed eventually into the future by the method. We can change method signature as:
def getDataFromServer(): Future[String]
Future[String] type means that future object can eventually contain a String object. Method can now be implemented without blocking-server call can be made asynchronously and when returned data is available, it is converted into a String and placed in the future, this completing the future. And once a future is completed, it can't be changed.

In Scala, futures has two aspects:future value and future computation.
A future value,Future[T], denotes a value of type T which might not be available currently but may be available later. In scala.concurrent package, futures are represented by trait Future[T].
A future computation represents an asynchronous computation which that generates a future value. In Scala, a future computation can be started by calling apply method on Future companion object.
def apply[T] (b: => T)(implicit e: ExecutionContext) : Future[T]
apply method takes a by-name parameter of type T. This is basically the body of the asynchronous computation which produces a value of type T.It also takes an implicit parameter of type ExecutionContext which takes care of when and where to execute a thread. apply method returns a future of type T, and is completed with the value returned by asynchronous computation b.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.globalobject FuturesDemo extends App{
  Future{println("Future is here")}
  println("Future is coming")
  Thread.sleep(500)
}

The order in which two println are called in in-deterministic.

To read the value from future, we can poll it for it's completion status:
object ReadFutureValueDemo extends App{
  val sbtFile = Future{
    val f = Source.fromFile("build.sbt")
    try f.getLines.mkString("\n") finally f.close()
  }
  println(s"started reading the build file asynchronously")
  println(s"status: ${sbtFile.isCompleted}")
  Thread.sleep(250)
  println(s"status: ${sbtFile.isCompleted}")
  println(s"value: ${sbtFile.value}")
}
Future callbacks

Polling a future for it's completion is not a good approach. A much better way is to make use of future callback. A callback is a function which gets executed when it's arguments are available. When we set a callback on a future, it gets called when future gets completed. foreach method is commonly used method to set a callback on a future. Important to note that callback is not invoked immediately after future is completed. Execution context schedules asynchronous task to execute the callback. So, future computation may run on a thread different from main thread, and callback may be executed on a totally different thread.
Moreover, we can set multiple callbacks on a future.
object FutureCallbackDemo extends App{
  def getUrlSpec(): Future[List[String]] = Future {
    val url = "http://www.w3.org/Addressing/URL" +
      "/url-spec.txt"        val f = Source.fromURL(url)
    try f.getLines.toList finally f.close()
  }
  val urlSpec: Future[List[String]] = getUrlSpec()
  def find(lines: List[String], keyword: String): String = {
    lines.zipWithIndex collect {
      case (line, n) if line.contains(keyword) => (n, line)
    } mkString ("\n")
  }
  urlSpec foreach{
    case lines => println(find(lines,"telnet"))
  }
  urlSpec foreach {
    case lines => println(find(lines, "password"))
  }
  println("Continue with other work")
  Thread.sleep(5000)
}
Futures and exceptions

If future computation throws an exception, future object cannot be completed with a value. It means future has failed.
foreach method only takes those callbacks which can handle successful future objects. failed method can work with failed objects. It returns Future[Throwable] object that contains exception with which future computation failed, and it can be used with foreach.
object FuturesFailureDemo extends App{
  val urlSpec: Future[String] = Future {
    val invalidUrl = "http://www.w3.org/non-existent-url-                    spec.txt"    Source.fromURL(invalidUrl).mkString
  }
  urlSpec.failed foreach {
    case t => println(s"exception occurred - $t")
  }
  Thread.sleep(1000)
}
We can also handle both future success and failure in same callback by using type Try[T]. The type Try[T] has two implementations:Success[T] which encodes result of successful future computation and Failure[T] which encodes the Throwable object that failed the future computation.
There are some Throwable objects which are not stored in a Future. These are miany fatal exceptions like InterruptedException,LinkageError, VirtualMachineError, ThreadDeath etc.

Promises

Promises are objects which can be assigned a value or an exception only once(hence also called single-assignment variable). It is represented with type Promise[T] in Scala. We make use of Promise.apply method in Promise companion object. This method returns an instance of Promise and is non-blocking. But unlike Future.apply, it doesn't start a asynchronous computation. It just creates a promise object. When a promise instance is created, it doesn't contain a value or an exception, just like a future object. Every promise object corresponds to exactly one future object,and to obtain associated future we call future method on promise. Same future object is returned on multiple calls of this method.
This way we can say that promise-future represent two aspects of single-assignment variable. Promise enables us to assign a value to a future object, while future allows us to read that value.
Once a promise has been successfully completed or failed, it can't be assigned another value or exception in any way.
A promise is assigned a value using success or failure.
object PromisesDemo extends App{
  val p = Promise[String]
  val q = Promise[String]
  p.future foreach { case x => println(s"p succeeded with                 '$x'") }
  Thread.sleep(1000)
  p.success("assigned")
  q.failure(new Exception("not kept"))
  q.future.failed foreach { case t => println(s"q failed     with $t") }
  Thread.sleep(1000)
}