Reputation: 30095
alsoTo doesn't seem to be working for me. Items don't come to sink defined in it. Here is what I have.
val merged: Source[ArticleWithKeywords, _] = ...
val (ks, fut) = merged
.alsoTo(Flow[ArticleWithKeywords].map { a => a.id -> a.ids.toList }.to(queueManager.getIdsForAnsSink))
.map(_.id)
.groupedWithin(100, 5 seconds)
.mapAsync(4) { ids => runReferenceFetching(ids) }
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.ignore)(Keep.both)
.run()
But I see items reaching runReferenceFetching. What am I missing ?
Upvotes: 1
Views: 552
Reputation: 30095
Turned out problem has nothing to do with alsoTo
. Problem was with sink which was created using Source.fromPublisher
. I erroneously thought I can create multiple sinks using same Publisher[T]
. Since there was already another sink second one didn't work.
Upvotes: 1