Asynchronous programming in Scala

by Kamil Korzekwa, February 2017
kamkor.megithub.com/kamkor@kamkorz

Use Space key or ← ↑ → ↓ keys to navigate trough slides.

Agenda

  • Introduction to asynchronous processing using Scala
  • Asynchronous processing in microservices

Synchronous, Asynchronous.. ?

Synchronous processing


val egg = chicken.layEgg()
val friedEgg = fry(egg)
val tea = brew(teabag)

val breakfast: (FriedEgg, Tea) = (friedEgg, tea)

Synchronous processing


// long running operation (just waiting for chicken)
val egg = chicken.layEgg()
// long running operation
val friedEgg = fry(egg)
// long running operation
val tea = brew(teabag)

val breakfast: (FriedEgg, Tea) = (friedEgg, tea)

Introduction to scala.concurrent.Future

Express latency side effect


// Future[Egg] is returned immediately
val eggF: Future[Egg] = chicken.layEgg()
						

Future is like a box that at some point of future might contain a value or error inside.

Get Future value with a callback


val eggF: Future[Egg] = chicken.layEgg()

eggF onComplete {
  case Success(egg)       => println("pretty " + egg)
  case Failure(exception) => println("ooops! " + exception.message)
}

How to create a Future


val eggF: Future[Egg] = 
  Future {
    chickenMagic()
  }

Expression inside the Future block is immediately scheduled for execution on some thread.

Future[A] => Future[B]

with Higher Order Functions

Map Future[] With =>


val eggF: Future[Egg] = chicken.layEgg()
val crackedF: Future[CrackedEgg] = eggF.map(crack)

def crack(egg: Egg): CrackedEgg = ???

Flatmap Future[] with => Future[]


val eggF: Future[Egg] = chicken.layEgg()
val friedEggF: Future[FriedEgg] = eggF.flatMap(fry)

def fry(egg: Egg): Future[FriedEgg] = ???

Future[], Future[]
=>
Future[(, )]

Zip Future[] WITH Future[]


val friedEggF: Future[FriedEgg] = ???
val teaF: Future[Tea] = ???

val breakfastF: Future[(FriedEgg, Tea))] = friedEggF.zip(teaF)

There is more!

Scala Future API has more cool functions. There are also ways to deal with errors. See Scala API.

Synchronous vs asynchronous


val egg: Egg = chicken.layEgg()
val friedEgg: FriedEgg = fry(egg)
val tea: Tea = brew(teabag)

val breakfast: (FriedEgg, Tea) = (friedEgg, tea)

val eggF: Future[Egg] = chicken.layEgg()

val breakfastF: Future[(FriedEgg, Tea)] =
  eggF flatMap { egg =>
    val friedEggF: Future[FriedEgg] = fry(egg)
    val teaF: Future[Tea] = brew(teabag)
    friedEggF.zip(teaF) // returns Future[(FriedEgg, Tea)]
  }

Chain Futures


val softFriedEggF: Future[FriedEgg] =
  chicken
    .layEgg() // returns Future[Egg]
    .flatMap(fry) // returns Future[FriedEgg]
    .filter(isSoft)

Futures & concurrency


// ExecutionContext decides how to schedule and on what thread
val executionContext = scala.concurrent.ExecutionContext.global

val eggF: Future[Egg] = Future { chickenMagic() }(executionContext)
val crackedEggF = eggF.map(crack)(executionContext)
crackedEggF onComplete { case Success(crackedEgg) => ??? case Failure(exception) => ??? }(executionContext)

Visibility & Synchronization on JVM

In insufficiently synchronized JVM program, thread A may see different data than thread B. One of the solutions for this problem are immutable objects. To learn more, read the book Concurrency In Practice:

Immutability in Scala


case class Foobar(foo: Int, bar: Int, theta: Int)

val f1 = Foobar(1, 2, 3)
val f2 = f1.copy(bar = 10)

// And also immutable collections!

Asynchronous processing in microservices

What used to be within single server boundary..

.. is now spread out in a distributed system.

What used to be in-process communication..

.. is now network communication.

They might communicate asynchronously via messages..

..or maybe synchronously via RESTful communication.

Consuming microservices

Synchronous processing

// network call, so just waiting for product service!
val product: Product = productClient.getProduct(productId)
// network call, so just waiting for price service!
val price: Price = priceClient.getPrice(productId, curr)

val details: (Product, Price) = (product, price)
Concurrently with Future

val productF: Future[Product] = productClient.getProduct(productId) val priceF: Future[Price] = priceClient.getPrice(productId, curr)
val details: Future[(Product, Price)] = productF.zip(priceF)

Consuming microservices

Concurrently with Future


val productF: Future[Product] = productClient.getProduct(productId)
val priceF: Future[Price] = priceClient.getPrice(productId, curr)

val detailsF: Future[(Product, Price)] = productF.zip(priceF)

How many threads do you need?

Blocking IO

  1. Start IO operation.
  2. WAIT for kernel. Thread is blocked/waiting.
  3. Receive result.

You need to maintain threads for blocking IO.

Non blocking IO: NIO

  1. Start IO operation & receive callback.
  2. Thread can do other things.
  3. Result is send via callback.

CPU intensive tasks

Use separate thread pools


def encrypt(plain: Array[Byte]): Future[Array[Byte]] =
 Future { 
   longRunningEncryption(plain) 
 }(executionContext)

Who in the end gets Future result?

Library that natively supports asynchronous model

For example, Finatra expects you to return the Future


class WishListsController(repo: WishListsRepository) 
  extends Controller {

get("/wishlists/:id") { r: Request => val wishListF: Future[WishList] = repo.get(r.getParam("id")) wishListF }
}

Many ways to do async in Scala..

Summary

  • Express latency side effect in your code
  • Functional + async programming = easy concurrency
  • Async can make some programs more responsive

Thank you, questions?