An Introduction to Iteratees with the Play Framework

Iteratees are an important concept when writing reactive applications. Being able to deal with streams of data is fundamental to web applications.

Before I delve into the details about Iteratees (in Play Framework), and what they are all about - let me try and introduce the fundamental concepts behind Iteratees.

Lets start with the humble fold method, a staple in Scala collections. This is what the signature of a fold method for List[A] looks like:

def fold [A1 >: A] (acc: A1)(op: (A1, A1) ⇒ A1): A1

The fold method uses an accumulator acc to store intermediate state, as the collection is being reduced to a single element of type A1. For e.g. if you needed to compute the sum of all elements in a list, you could do the following:

val list = List(1, 2, 3, 4, 5)
val sum = list.fold(0)((a: Int, b: Int) => a + b)

Now, lets go back to standard Java Iterators. The Java Iterator<E> defines the following interface (for the sake of simplicity let’s ignore the remove() for now):

interface Iterator<E> {
  boolean hasNext();
  E next();
}

An Iterator<E> is a producer. It produces elements of the type E. Let’s say, we wanted to make this interface reactive. We could start by making the hasNext() and the next() methods return a Scala Future. So it would look something like:

trait Iterator[E] {
  def hasNext : Future[Boolean]
  def next(): Future[E]
}

This approach gets us about half-way. The consumers now have to deal with the futures being returned by the Iterator. The consumer, however has no way to tell the producer that the producer needs to slow down (also known as back-pressure).

Lets define a trait to represent the consumer.

trait Consumer[I] {
  def consume(input: I): Future[_]
}

In the above trait, the consumer consumes messages of type I (input) and returns a Future[_] (a future of something - we will come back to this later). Returning an instance of Future solves the problem of back-pressure very elegantly, as the producer only sends new messages once the consumer’s Future[_] gets resolved.

Lets say, we added an accumulator to the consume(..) method so it looks like:

def consume[A1 >: I](state: A1)(input: I): Future[_]

where the state is a pure accumulator. Now this, is starting to look like our fold method.

This is the fundamental concept behind Iteratees. Now, lets get some terminology in place. The Enumerator[E] in the Play Iteratees library, is the producer.

Every Enumerator needs a consumer, and a consumer is called an Iteratee[E, A] where the Iteratee takes in elements of type E (as input) and returns an output of type A.

The Play Iteratees library also defines a transformer type (called an Enumeratee[From, To]). It provides a way to transform or adapt streams of data. For e.g. you could use an Enumeratee[String, Int] to help an Iteratee[Int, Int] to consume an Enumerator[String].

Enumerator[String] --> Enumeratee[String, Int] --> Iteratee[Int, Int]

So far, we have looked at solving the problem of back-pressure by returning a Future[_] (which represents a future of an unknown type). We still have a few other use cases worth solving. For e.g. what if the consumer was only interested in the first couple of messages or bytes and did not care about the rest of the stream. To handle this the Play Iteratees library introduces an object Step which can be one of the following:

object Step {
  // represents the done state.
  case class Done[+A, E] (a: A, remaining: E) extends Step[E, A]

  // represents the continue state
  // iteratee tells the producer
  // that it is ready to accept more data.
  case class Cont[E, +A](k: Input[E] => Iteratee[E, A]) 
    extends Step[E, A]

  // represents an error state
  // iteratee tells producer an error as occurred,
  // and which message resulted in the error
  case class Error[E](msg: String, input: Input[E]) 
    extends Step[E, Nothing]
}

Now lets back to our consumemethod that we defined as:

def consume[A1 >: I](input: I): Future[_]

The Future[_] is replaced with a Future[Iteratee[I, O]]. At every Step, the Iteratee returns a future version of itself. The Enumerator sends more data to the iteratee, only if the Iteratee needs data (i.e. if the Iteratee is in the Step.Cont state).

The interface, that represents a real Play Iteratee looks like:

trait Iteratee[I, O] {
  def fold[B](folder: (Step[I, O]) => Future[B])
    (implicit ec: ExecutionContext): Future[B] = {
}

To implement an Iteratee, you need to provide an implementation for a fold method. The fold method sets up the initial state of the Iteratee, and provides a step function, that can take a Step and return a Iteratee[I, O].

Let’s take a look at an example. Let’s define an Iteratee that consumes messages of type String and accumulates them to make a sentence. Once it has accumulated all the messages, it just prints them to the standard output stream, and returns nothing (Unit).

val iteratee = new Iteratee[String, Unit] {
  override def fold[B](folder: (Step[String, Unit]) => Future[B])
    (implicit ec: ExecutionContext): Future[B] = {
    // accumulator
    val buffer: ListBuffer[String] = ListBuffer()

    // the step function
    def stepFn(in: Input[String]): Iteratee[String, Unit] = {
      in match {
        case Input.Empty => this
        case Input.EOF => Done({
          println(s"Result ${buffer.mkString("--")}")
        }, Input.Empty)
        case Input.El(el) => {
          buffer += el
          Cont(stepFn)
        }
      }
    }

    // initial state -> iteratee ready to accept input
    folder(Step.Cont(stepFn))
  }

}

Looking at the implementation of the Iteratee, the first thing it does is setup it’s initial state. It tells the folder (i.e. the function driving the Iteratee) that it’s ready to accept messages by saying that it’s in a Step.Cont state.

The stepFn takes an Input[String]. Inputs can be in one of three states. Input.Empty means that the Enumerator does not have a message at this point in time. The stepFn in this case, does nothing but return itself (as there is nothing that the Iteratee needs to do).

If the input is of type Input.EOF, that would mean that the Enumerator has no more messages to produce (end of stream). The Iteratee just prints the accumulated messages to the output stream.

The third case which is more interesting, is the case where the Enumerator produces an element (Input.El). The accumulator just accumulates the messages, and returns a new Iteratee in the Step.Cont state - which means the Iteratee is still accepting new messages.

We glossed over the folder, the method that drives an Iteratee. Let’s look at the implementation of a real Enumerator in Play - which should clear things up.

val enumerator = new Enumerator[String] {
    // some messages
    val items = 1 to 10 map (i => i.toString)
    var index = 0

    override def apply[A](i: Iteratee[String, A]): 
      Future[Iteratee[String, A]] = {
      i.fold(
      // the folder
      {
        step => {
          step match {
            // iteratee is done, so no more messages
            // to send
            case Step.Done(result, remaining) => {
              println("Step.Done")
              Future(i)
            }

            // iteratee can consume more
            case Step.Cont(k: (Input[String] => Iteratee[String, A])) 
            => {
              println("Step.Cont")
              // does enumerator have more messages ?
              if (index < items.size) {
                val item = items(index)
                println(s"El($item)")
                index += 1

                // get new state of iteratee
                val newIteratee = k(Input.El(item))

                // recursive apply
                apply(newIteratee)
              } else {
                println("EOF")
                Future(k(Input.EOF))
              }
            }

            // iteratee is in error state
            case Step.Error(message, input: Input[String]) => {
              println("Step.Error")
              Future(i)
            }
          }
        }
      })
    }
  }

The folder takes an Iteratee in a certain state (represented by the Step case class) and decides whether to drive the Iteratee further.

In the above example, if the Iteratee is in the Step.Done or in the Step.Error state, then there is nothing for the Enumerator to do. It no longer drives the Iteratee.

If the Iteratee is in the Step.Cont state, then, the Iteratee is ready to accept messages. We extract the k function (which translates an Input[E] to a new Iteratee) and give it a message of type Input.El. Now that we have the new state of the Iteratee (which is returned by the k function) we recursively call the apply method to drive the new state of the Iteratee.

Finally, sending messages to the Iteratee is as simple as:

Await.result(enumerator |>>> iteratee, Duration.Inf)

That’s pretty much it. You now understand the idea behind the Play Iteratees library. All of the code can also be found in this gist.

Notes:

Although I talked about Enumeratees, I have not provided an example or explained the mechanics of an Enumeratee. I leave it as an exercise to the reader. One important thing to note is that an Enumeratee is both an Enumerator and an Iteratee at the same time. I will just leave it at that.

For the the sake of keeping things simple, I have violated the principle of side-effect free programming. Both the Enumerator and the Iteratee can carry forward state through an accumulator.

Iteratees take some getting used to. The Play Framework’s implementation makes it a little harder to understand IMHO. I wish the Iteratee was an implementation of the Step trait itself.

Some very useful resources on Play Iteratees are:

Play Iteratees for normal humans

Iteratees in Play, an Introduction by James Roper

Play Iteratees vs. Node Websockets

Composable and Streamable Web Apps

PS

Special note of thanks goes to @sgodbillon for his help with this article.

If you liked this article or have comments, drop me a note at @tikurahul

 
56
Kudos
 
56
Kudos

Now read this

Empower

Elevator Pitch # We want to empower citizens by giving them the ability to provide constructive feedback when they interact with Government officials or officials representing a public-serving organization. The How? # We use the... Continue →