Reputation: 5023
I have two Input Stream. I would like to merge two stream element based on same ID. Here is the code details
implicit val system = ActorSystem("sourceDemo")
implicit val materializer = ActorMaterializer()
case class Foo(id: Int, value: String)
case class Bar(id: Int, value: String)
case class MergeResult(id: Int, fooValue: String, barValue: String)
val sourceOne = Source(List.fill(100)(Foo(Random.nextInt(100), value = "foo")))
val sourceTwo = Source(List.fill(100)(Bar(Random.nextInt(100), value = "bar")))
What I would like to get the result is MergeResult
, which is based on the same id in Foo
and Bar
.
Also, for some Foo
and Bar
which has mismatched id, I would like to keep in the memory, I wonder if there is a clean way to do it because it is stateful.
More importantly, the source elements are in order. If there are ID duplicates found, the strategy should be first matched first served. That means if Foo(1, "foo-1"), Foo(1, "foo-2")
and Bar(1, "Bar-1")
, the match should be MergeResult(1, "foo-1", "Bar-1")
.
I am looking at some solutions from akka stream at the moment. If there are some good solution like Spark, Flink and so on, that would be helpful as well.
Thanks in advance.
Upvotes: 0
Views: 862
Reputation: 1678
You are precisely describing a join operation.
Akka streams does not support join operations. You may find a way to do that using windowing on each stream and some actor/stateful transformation to do the lookup between them, but last time I searched for this I found nothing (not so long ago), so you are probably in uncharted waters.
You will only find joins on streams on more heavy-weight frameworks: Flink, Spark Streaming, Kafka streams. The reason is that joins fundamentally is a lookup of one stream against another, which means that it needs more complex stuff (state management) than the designers of Akka streams wanted to deal with.
Upvotes: 3