Reputation: 853
I am new to ZHub and ZStream and wanted to familiarize myself with their APIs.
Unfortnuately, I could not make this simple example work:
for
hub <- Hub.bounded[String](4)
stream = ZStream.fromHub(hub)
_ <- hub.publish("Hello")
_ <- hub.publish("World")
collected <- stream.runCollect
_ <- ZIO.foreach(collected) { msg => console.putStrLn(msg) }
yield
()
This program does not terminate, I suspect, because I am trying to collect an infinite stream. I have also tried to print the messages using stream.tap(...)
or to shut down the hub. Nothing has helped.
What am I missing here? Any help is appreciated, thanks.
Upvotes: 0
Views: 329
Reputation: 853
@adamgfraser kindly provided a working example on GitHub:
import zio._
import zio.stream._
object Example extends App {
def run(args: List[String]): URIO[ZEnv, ExitCode] =
for {
promise <- Promise.make[Nothing, Unit]
hub <- Hub.bounded[String](2)
stream = ZStream.managed(hub.subscribe).flatMap { queue =>
ZStream.fromEffect(promise.succeed(())) *>
ZStream.fromQueue(queue)
}
fiber <- stream.take(2).runCollect.fork
_ <- promise.await
_ <- hub.publish("Hello")
_ <- hub.publish("World")
collected <- fiber.join
_ <- ZIO.foreach(collected)(console.putStrLn(_)).orDie
} yield ExitCode.success
}
My mistake was to publish values to the hub before waiting for the subscription to complete.
Upvotes: 0