Reputation: 127831
I have an external (that is, I cannot change it) Java API which looks like this:
public interface Sender {
void send(Event e);
I need to implement a Sender
which accepts each event, transforms it to a JSON object, collects some number of them into a single bundle and sends over HTTP to some endpoint. This all should be done asynchronously, without send()
blocking the calling thread, with some fixed-size buffer and dropping new events if the buffer is full.
With akka-streams this is quite simple: I create a graph of stages (which uses akka-http to send HTTP requests), materialize it and use the materialized ActorRef
to push new events to the stream:
lazy val eventPipeline = Source.actorRef[Event](Int.MaxValue,
.via(CustomBuffer(bufferSize)) // buffer all events
.groupedWithin(batchSize, flushDuration) // group events into chunks
.map(toBundle) // convert each chunk into a JSON message
.mapAsyncUnordered(1)(sendHttpRequest) // send an HTTP request
.toMat(Sink.foreach { response =>
// print HTTP response for debugging
lazy val (eventsActor, completeFuture) =
override def send(e: Event): Unit = {
eventsActor ! e
Here CustomBuffer
is a custom GraphStage
which is very similar to the library-provided Buffer
but tailored to our specific needs; it probably does not matter for this particular question.
As you can see, interacting with the stream from non-stream code is very simple - the !
method on the ActorRef
trait is asynchronous and does not need any additional machinery to be called. Each event which is sent to the actor is then processed through the entire reactive pipeline. Moreover, because of how akka-http is implemented, I even get connection pooling for free, so no more than one connection is opened to the server.
However, I cannot find a way to do the same thing with FS2 properly. Even discarding the question of buffering (I will probably need to write a custom Pipe
implementation which does additional things that we need) and HTTP connection pooling, I'm still stuck with a more basic thing - that is, how to push the data to the reactive stream "from outside".
All tutorials and documentation that I can find assume that the entire program happens inside some effect context, usually IO
. This is not my case - the send()
method is invoked by the Java library at unspecified times. Therefore, I just cannot keep everything inside one IO
action, I necessarily have to finalize the "push" action inside the send()
method, and have the reactive stream as a separate entity, because I want to aggregate events and hopefully pool HTTP connections (which I believe is naturally tied to the reactive stream).
I assume that I need some additional data structure, like Queue
. fs2 does indeed have some kind of fs2.concurrent.Queue
, but again, all documentation shows how to use it inside a single IO
context, so I assume that doing something like
val queue: Queue[IO, Event] = Queue.unbounded[IO, Event].unsafeRunSync()
and then using queue
inside the stream definition and then separately inside the send()
method with further unsafeRun
val eventPipeline = queue.dequeue
.groupWithin(batchSize, flushDuration)
.evalTap(response => ...)
eventPipeline.unsafeRunAsync(...) // or something
override def send(e: Event) {
is not the correct way and most likely would not even work.
So, my question is, how do I properly use fs2 to solve my problem?
Upvotes: 8
Views: 1918
Reputation: 2401
We can create a bounded queue that will consume elements from sender and make them available to fs2 stream processing.
import cats.effect.IO
import cats.effect.std.Queue
import fs2.Stream
trait Sender[T]:
def send(e: T): Unit
object Sender:
def apply[T](bufferSize: Int): IO[(Sender[T], Stream[IO, T])] =
q <- Queue.bounded[IO, T](bufferSize)
val sender: Sender[T] = (e: T) => q.offer(e).unsafeRunSync()
def stm: Stream[IO, T] = Stream.eval(q.take) ++ stm
(sender, stm)
Then we'll have two ends - one for Java worlds, to send new elements to Sender
. Another one - for stream processing in fs2.
class TestSenderQueue:
@Test def testSenderQueue: Unit =
val (sender, stream) = Sender[Int](1)
.unsafeRunSync()// we have to run it preliminary to make `sender` available to external system
val processing =
.map(i => i * i)
.evalMap{ ii => IO{ println(ii)}}
processing.compile.toList.start//NB! we start processing in a separate fiber
.unsafeRunSync() // immediately right now.
(0 until 100).foreach(sender.send)
Note that we push data in the current thread and have to run fs2 in a separate thread (.start
Upvotes: 0
Reputation: 9072
Consider the following example:
import cats.implicits._
import cats.effect._
import cats.effect.implicits._
import fs2._
import fs2.concurrent.Queue
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
object Answer {
type Event = String
trait Sender {
def send(event: Event): Unit
def main(args: Array[String]): Unit = {
val sender: Sender = {
val ec =
implicit val cs: ContextShift[IO] = IO.contextShift(ec)
implicit val timer: Timer[IO] = IO.timer(ec)
val events = List("a", "b", "c", "d")
events.foreach { evt => new Thread(() => sender.send(evt)).start() }
Thread sleep 3000
def fs2Sender[F[_]: Timer : ContextShift](maxBufferedSize: Int)(implicit F: ConcurrentEffect[F]): Sender = {
// dummy impl
// this is where the actual logic for batching
// and shipping over the network would live
val consume: Pipe[F, Event, Unit] = _.evalMap { event =>
for {
_ <- F.delay { println(s"consuming [$event]...") }
_ <- Timer[F].sleep(1.seconds)
_ <- F.delay { println(s"...[$event] consumed") }
} yield ()
val suspended = for {
q <- Queue.bounded[F, Event](maxBufferedSize)
_ <- q.dequeue.through(consume).compile.drain.start
sender <- F.delay[Sender] { evt =>
val enqueue = for {
wasEnqueued <- q.offer1(evt)
_ <- F.delay { println(s"[$evt] enqueued? $wasEnqueued") }
} yield ()
} yield sender
The main idea is to use a concurrent Queue from fs2. Note, that the above code demonstrates that neither the Sender
interface nor the logic in main
can be changed. Only an implementation of the Sender
interface can be swapped out.
Upvotes: 2
Reputation: 4017
I don't have much experience with exactly that library but it should look somehow like that:
import cats.effect.{ExitCode, IO, IOApp}
import fs2.concurrent.Queue
case class Event(id: Int)
class JavaProducer{
new Thread(new Runnable {
override def run(): Unit = {
var id = 0
id += 1
def send(event: Event): Unit ={
println(s"Original producer prints $event")
class HackedProducer(queue: Queue[IO, Event]) extends JavaProducer {
override def send(event: Event): Unit = {
println(s"Hacked producer pushes $event")
println(s"Hacked producer pushes $event - Pushed")
object Test extends IOApp{
override def run(args: List[String]): IO[ExitCode] = {
val x: IO[Unit] = for {
queue <- Queue.unbounded[IO, Event]
_ = new HackedProducer(queue)
done <- => {
println(s"Got $ev")
} yield done => ExitCode.Success)
Upvotes: 1