expert
expert

Reputation: 30095

Nothing comes to Sink defined in alsoTo

alsoTo doesn't seem to be working for me. Items don't come to sink defined in it. Here is what I have.

val merged: Source[ArticleWithKeywords, _] =  ...
val (ks, fut) = merged
  .alsoTo(Flow[ArticleWithKeywords].map { a => a.id -> a.ids.toList }.to(queueManager.getIdsForAnsSink))
  .map(_.id)
  .groupedWithin(100, 5 seconds)
  .mapAsync(4) { ids => runReferenceFetching(ids) }
  .viaMat(KillSwitches.single)(Keep.right)
  .toMat(Sink.ignore)(Keep.both)
  .run()

But I see items reaching runReferenceFetching. What am I missing ?

Upvotes: 1

Views: 552

Answers (1)

expert
expert

Reputation: 30095

Turned out problem has nothing to do with alsoTo. Problem was with sink which was created using Source.fromPublisher. I erroneously thought I can create multiple sinks using same Publisher[T]. Since there was already another sink second one didn't work.

Upvotes: 1

Related Questions