Reputation: 17574
I have a project where we are using akka-streams with Java.
In this project I have a stream of strings and a graph that does some operations on them.
In my graph, I want to broadcast that stream to 2 workers. One will replace all characters 'a'
with 'A'
and send data as it receives it in real time.
The other one will receive the data, and every 3 strings, it will concat those 3 strings and map them to numbers.
It would look like the following:
Obviously Sink 2
will not receive information as fast as Sink 1
. but that is expected behavior. The interesting part here, is worker 2.
Doing worker 1 is easy, and not hard. The issue here is doing worker 2. I know akka has buffers that can save up to X messages, but then it looks like I am forced to choose one of the existing Overflow strategies which often result in choosing which message I want to drop or if I want to keep the stream alive or not.
All I want is to, when my buffer in worke2 reaches the maximum size of the buffer, to perform the concat and map operations on all the messages it has, and then send them along ( resetting the buffer after ).
But even after reading the stream-rate documentation for akka I couldn't find a way of doing it, at least using Java.
I also checked a similar SO question, Selective request-throttling using akka-http stream however it has been over an year and no one has responded.
Using the graph DSL, how would I create the path from:
Source -> bcast -> worker2 -> Sink 2
??
Upvotes: 0
Views: 352
Reputation: 13902
After your bcast
apply the groupedWithin
operator with an unlimited duration and a number of element set to 3.
https://doc.akka.io/docs/akka/2.5/stream/operators/Source-or-Flow/groupedWithin.html
You can also do it yourself, adding a stage that stores element in a List
and emit the list every time it reaches 3 elements.
import akka.stream.Attributes;
import akka.stream.FlowShape;
import akka.stream.Inlet;
import akka.stream.Outlet;
import akka.stream.stage.AbstractInHandler;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
public class RecordGrouper<T> extends GraphStage<FlowShape<T, List<T>>> {
private final Inlet<T> inlet = Inlet.create("in");
private final Outlet<List<T>> outlet = Outlet.create("out");
private final FlowShape<T, List<T>> shape = new FlowShape<>(inlet, outlet);
@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new GraphStageLogic(shape) {
List<T> batch = new ArrayList<>(3);
{
setHandler(
inlet,
new AbstractInHandler() {
@Override
public void onPush() {
T record = grab(inlet);
batch.add(record);
if (batch.size() == 3) {
emit(outlet, ImmutableList.copyOf(batch));
batch.clear();
}
pull(inlet);
}
});
}
@Override
public void preStart() {
pull(inlet);
}
};
}
@Override
public FlowShape<T, List<T>> shape() {
return shape;
}
}
As a side node, I don't think the buffer
operator will work as it only kicks in when there's backpressure. So if everything is quiet, elements will still be emitted one by one instead of 3 by 3. https://doc.akka.io/docs/akka/2.5/stream/operators/Source-or-Flow/buffer.html
Upvotes: 1