Flame_Phoenix
Flame_Phoenix

Reputation: 17574

How to throttle request in a graph using akka stream?

Background

I have a project where we are using akka-streams with Java.

In this project I have a stream of strings and a graph that does some operations on them.

Objective

In my graph, I want to broadcast that stream to 2 workers. One will replace all characters 'a' with 'A' and send data as it receives it in real time.

The other one will receive the data, and every 3 strings, it will concat those 3 strings and map them to numbers.

It would look like the following:

akka-streams-buffer

Obviously Sink 2 will not receive information as fast as Sink 1. but that is expected behavior. The interesting part here, is worker 2.

Problem

Doing worker 1 is easy, and not hard. The issue here is doing worker 2. I know akka has buffers that can save up to X messages, but then it looks like I am forced to choose one of the existing Overflow strategies which often result in choosing which message I want to drop or if I want to keep the stream alive or not.

All I want is to, when my buffer in worke2 reaches the maximum size of the buffer, to perform the concat and map operations on all the messages it has, and then send them along ( resetting the buffer after ).

But even after reading the stream-rate documentation for akka I couldn't find a way of doing it, at least using Java.

Research

I also checked a similar SO question, Selective request-throttling using akka-http stream however it has been over an year and no one has responded.

Questions

Using the graph DSL, how would I create the path from:

Source -> bcast -> worker2 -> Sink 2

??

Upvotes: 0

Views: 352

Answers (1)

0x26res
0x26res

Reputation: 13902

After your bcast apply the groupedWithin operator with an unlimited duration and a number of element set to 3. https://doc.akka.io/docs/akka/2.5/stream/operators/Source-or-Flow/groupedWithin.html

You can also do it yourself, adding a stage that stores element in a List and emit the list every time it reaches 3 elements.

import akka.stream.Attributes;
import akka.stream.FlowShape;
import akka.stream.Inlet;
import akka.stream.Outlet;
import akka.stream.stage.AbstractInHandler;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;

public class RecordGrouper<T> extends GraphStage<FlowShape<T, List<T>>> {

  private final Inlet<T> inlet = Inlet.create("in");
  private final Outlet<List<T>> outlet = Outlet.create("out");
  private final FlowShape<T, List<T>> shape = new FlowShape<>(inlet, outlet);

  @Override
  public GraphStageLogic createLogic(Attributes inheritedAttributes) {
    return new GraphStageLogic(shape) {
      List<T> batch = new ArrayList<>(3);

      {
        setHandler(
            inlet,
            new AbstractInHandler() {
              @Override
              public void onPush() {
                T record = grab(inlet);
                batch.add(record);
                if (batch.size() == 3) {
                  emit(outlet, ImmutableList.copyOf(batch));
                  batch.clear();
                }
                pull(inlet);
              }
            });
      }

      @Override
      public void preStart() {
        pull(inlet);
      }
    };
  }

  @Override
  public FlowShape<T, List<T>> shape() {
    return shape;
  }
}

As a side node, I don't think the buffer operator will work as it only kicks in when there's backpressure. So if everything is quiet, elements will still be emitted one by one instead of 3 by 3. https://doc.akka.io/docs/akka/2.5/stream/operators/Source-or-Flow/buffer.html

Upvotes: 1

Related Questions