jackweirdy
jackweirdy

Reputation: 5850

Akka Streams: How do I model capacity/rate limiting within a system of 2 related streams?

Lets say I have a pizza oven and a line of pizzas I need to bake. My oven only has capacity to bake 4 pizzas at a time, and it's reasonable to expect that over the course of a day there's always at least 4 in the queue, so the oven will need to be at full capacity as often as possible.

Every time I put a pizza in the oven I set a timer on my phone. Once that goes off, I take the pizza out of the oven, give it to whoever wants it, and capacity becomes available.

I have 2 Sources here, one being the queue of pizzas to be cooked, and one of the egg timer that goes off when a pizza has been cooked. There are also 2 Sinks in the system, one being the destination for the cooked pizza, the other being a place to send confirmation that a pizza has been put into the oven.

I'm currently representing these very naively, as follows:

Source.fromIterator(() => pizzas)
    .map(putInOven) // puts in oven and sets a timer
    .runWith(Sink.actorRef(confirmationDest, EndSignal))

Source.fromIterator(() => timerAlerts)
    .map(removePizza)
    .runWith(Sink.actorRef(pizzaDest, EndSignal))

However, these two streams are currently completely independent of each other. The eggTimer functions correctly, removing a pizza whenever it is collected. But it can't signal to the previous flow that capacity has become available. In fact, the first flow has no concept of capacity at all, and will just try to cram pizzas into the oven as soon as they join the line.

What Akka concepts can be used to compose these flows in such a way that the first only takes pizzas from the queue when there's capacity, and that the second flow can "alert" the first one to a change in capacity when a pizza is removed from the oven.

My initial impression is to implement a flow graph like this:

   ┌─────────────┐                                                          
┌─>│CapacityAvail│>──┐                                                      
│  └─────────────┘   │   ┌─────────────┐   ┌─────────────┐   ┌─────────────┐
│  ┌─────────────┐   ├──>│     Zip     │>─>│  PutInOven  │>─>│   Confirm   │
│  │    Queue    │>──┘   └─────────────┘   └─────────────┘   └─────────────┘
│  └─────────────┘                                                          
│  ┌─────────────┐       ┌─────────────┐                                    
│  │    Done     │>─────>│  SendPizza  │                                    
│  └─────────────┘       └─────────────┘                                    
│         v                                                                 
│         │                                                                 
└─────────┘                    

The principle that underpins this is that there are a fixed number of CapacityAvailable objects which populate the CapacityAvail Source. They're zipped with events that come in to the Pizza queue, meaning if none are available, no pizza processing starts as the zip operation will wait for them.

Then, once a pizza is done, an CapacityAvailable object is pushed back into the pool.

The main barrier I'm seeing to this implementation is that I'm not sure how to create and populate a pool for the CapacityAvail source, and I'm also not sure whether a Source can also be a Sink. Are there any Source/Sink/Flow types that would be suitable implementation for this?

Upvotes: 4

Views: 637

Answers (3)

jackweirdy
jackweirdy

Reputation: 5850

This is what I ended up using. It's pretty much an exact implementation of the faux-state machine in the question. The mechanics of Source.queue are much clumsier than I would have hoped, but it's otherwise pretty clean. The real sinks and sources are provided as parameters and are constructed elsewhere, so the actual implementation has a little less boilerplate than this.

RunnableGraph.fromGraph(GraphDSL.create() {
  implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    // Our Capacity Bucket. Can be refilled by passing CapacityAvaiable objects 
    // into capacitySrc. Can be consumed by using capacity as a Source.
    val (capacity, capacitySrc) =
      peekMatValue(Source.queue[CapacityAvailable.type](CONCURRENT_CAPACITY,
                                                        OverflowStrategy.fail))

    // Set initial capacity
    capacitySrc.foreach(c =>
      Seq.fill(CONCURRENT_CAPACITY)(CapacityAvailable).foreach(c.offer))


    // Pull pizzas from the RabbitMQ queue
    val cookQ = RabbitSource(rabbitControl, channel(qos = CONCURRENT_CAPACITY),
                             consume(queue("pizzas-to-cook")), body(as[TaskRun]))

    // Take the blocking events stream and turn into a source
    // (Blocking in a separate dispatcher)
    val cookEventsQ = Source.fromIterator(() => oven.events().asScala)
        .withAttributes(ActorAttributes.dispatcher("blocking-dispatcher"))

    // Split the events stream into two sources so 2 flows can be attached
    val bc = builder.add(Broadcast[PizzaEvent](2))

    // Zip pizzas with the capacity pool. Stops cooking pizzas when oven full.
    // When cooking starts, send the confirmation back to rabbitMQ
    cookQ.zip(AckedSource(capacity)).map(_._1)
      .mapAsync(CONCURRENT_CAPACITY)(pizzaOven.cook)
      .map(Message.queue(_, "pizzas-started-cooking"))
      .acked ~> Sink.actorRef(rabbitControl, HostDied)

    // Send the cook events stream into two flows
    cookEventsQ ~> bc.in

    // The first tops up the capacity pool
    bc.out(0)
      .mapAsync(CONCURRENT_CAPACITY)(e =>
         capacitySrc.flatMap(cs => cs.offer(CapacityAvailable))
      ) ~> Sink.ignore

    // The second sends out cooked events
    bc.out(1)
      .map(p => Message.queue(Cooked(p.id()), "pizzas-cooked")
    ) ~> Sink.actorRef(rabbitControl, HostDied)

    ClosedShape
}).run()

Upvotes: 1

Patrik Nordwall
Patrik Nordwall

Reputation: 2426

You can represent the oven with a mapAsyncUnordered stage with parallelism=4. Completion of the Future can be from a timer (http://doc.akka.io/docs/akka/2.4/scala/futures.html#After) or that you decide to take it out from the oven for some other reason.

Upvotes: 1

This use case does not generally map well to Akka Streams. Under the hood an Akka Stream is a reactive stream; from the documentation:

Akka Streams implementation uses the Reactive Streams interfaces internally to pass data between the different processing stages.

Your pizza example doesn't apply to streams because you have some external event that is just as much a broadcaster of demand as the sink of your stream. The fact that you openly state "the first flow has no concept of capacity at all" means that you aren't using streams for their intended purpose.

It is always possible to use some weird coding ju-jitsu to awkwardly bend streams to solve a concurrency problem, but you'll likely have difficulties maintaining this code down-the-line. I recommend you consider using Futures, Actors, or plain-old Threads as your concurrency mechanism. If your oven has infinite capacity to hold cooking pizzas then there's no need for streams to begin with.

I would also re-examine your entire design since you are using the passage of clock time as the signaler of demand (i.e. your "egg timer"). This usually indicates a flaw in the process design. If you can't get around this requirement then you should evaluate other design patterns:

  1. Periodic Message Scheduling
  2. Non Thread Block Timeouts

Upvotes: 2

Related Questions