Reputation: 2672
I know a similar question has already been asked:
How to attach multiple actors as sources to an Akka stream?
But I would like to do the same thing, however I don't know the number of sources in advance. So how can I dynamically add multiple sources to an akka stream?
For reference, this is the accepted answer from the other question, handling sources s1 and s2:
Source<Integer, ActorRef> src1 = Source.actorRef(100, OverflowStrategy.fail());
Source<Integer, ActorRef> src2 = Source.actorRef(100, OverflowStrategy.fail());
Sink<Integer, BoxedUnit> sink = Flow.of(Integer.class).to(Sink.foreach(System.out::println));
RunnableFlow<List<ActorRef>> closed = FlowGraph.factory().closed(src1, src2, (a1, a2) -> Arrays.asList(a1, a2), (b, s1, s2) -> {
UniformFanInShape<Integer, Integer> merge = b.graph(Merge.<Integer>create(2));
b.from(s1).via(merge).to(sink);
b.from(s2).to(merge);
});
List<ActorRef> stream = closed.run(mat);
ActorRef a1 = stream.get(0);
ActorRef a2 = stream.get(1);
However in my case, I'm adding sources as new sources come online, and deleting them as they go away.
Thanks!
Upvotes: 0
Views: 303
Reputation: 4314
If you want to dynamically add sources to a running graph, I think the MergeHub is designed to do just that:
https://doc.akka.io/docs/akka/current/stream/stream-dynamic.html#using-the-mergehub
// A simple consumer that will print to the console for now
Sink<String, CompletionStage<Done>> consumer = Sink.foreach(System.out::println);
// Attach a MergeHub Source to the consumer. This will materialize to a
// corresponding Sink.
RunnableGraph<Sink<String, NotUsed>> runnableGraph =
MergeHub.of(String.class, 16).to(consumer);
// By running/materializing the consumer we get back a Sink, and hence
// now have access to feed elements into it. This Sink can be materialized
// any number of times, and every element that enters the Sink will
// be consumed by our consumer.
Sink<String, NotUsed> toConsumer = runnableGraph.run(materializer);
Source.single("Hello!").runWith(toConsumer, materializer);
Source.single("Hub!").runWith(toConsumer, materializer);
As to closing these new graphs when they're no longer needed I think you can add a KillSwitch:
https://doc.akka.io/docs/akka/current/stream/stream-dynamic.html#uniquekillswitch
So attaching a new source dynamically would instead end up looking something like this (not tested):
UniqueKillSwitch killSwitch = Source.single("Hello!") // or whatever you want your source to be
.viaMat(KillSwitches.single(), Keep.right())
.toMat(toConsumer, Keep.left())
.run(materializer);
Upvotes: 2