grednoud
grednoud

Reputation: 19

How to listen redis list event with redisson and ZIO

With Radisson, simply receiving events is enough to add new items to the list. To do this, you need to do the following:

object Test extends App {
  val redisson = Redisson.create()
  val events = redisson.getQueue[String]("minio_events", new StringCodec())
  events.addListener(new ListAddListener() {
    override def onListAdd(o: String): Unit = println(o)
  })
}

Difficulties begin when it needs to be wrapped in ZIO. How can I wrap this event in ZIO or ZStream to start the chain of event processing?

Upvotes: 0

Views: 1049

Answers (2)

paulpdaniels
paulpdaniels

Reputation: 18663

It looks like Redisson supports converting the RedissonClient into a reactive streams client which there is a zio-interop for. But if you just want to work directly with the java interface I think you can do something like this (note I haven't actually tested this):

object Test extends zio.App {
   def run(args: List[String]): ZIO[ZEnv, Nothing, ExitCode] = 
       (for {
         // Construct the client in the scope of the stream so it shuts down when done
         c <- ZStream.managed(ZManaged.makeEffect(Redisson.create())(_.shutdown()))

         // Variant of effectAsync* that lets you specify an interrupter
         s <- ZStream.effectAsyncInterrupt[Any, Nothing, String] { k =>
           val queue = c.getQueue[String]("", new StringCodec())
           val listenerId = queue.addListener(new ListAddListener {

             // Invoke the callback by passing in a ZIO with a single chunk
             def onListAdd(name: String): Unit = k(ZIO.succeed(Chunk.single(name)))
           })

           // Return a cancellation handler.
           Left(UIO(queue.removeListener(listenerId)))
        }
      } { zio.console.putStrLn(s) }).exitCode
}

Upvotes: 2

Rodrigo Vedovato
Rodrigo Vedovato

Reputation: 1008

ZStream is pull based, that means you'll have to pull the data from minio_events in some way

val redisson = Redisson.create()    
val bqueue : RQueue[String] = redisson.getQueue("minio_events", new StringCodec())
    
val pollQueue = 
  ZIO
    .effect(Option(bqueue.poll())) // RQueue.poll returns null if the queue is empty
    .someOrFail(NoElementsOnStream)

This creates a ZIO[Any, Throwable, String] representing your polling operation which can now be turned into a ZStream by calling the ZStream.fromEffect method

ZStream
  .fromEffect(pollQueue)
  .foreach(s => putStrLn(s))
  .exitCode

If you place this code inside a zio.App main function, you'll see that it runs only once. So we need to make it run forever and retry it until if finds another element

ZStream
  .fromEffect(pollQueue)
  .retry(Schedule.spaced(1.second))
  .forever
  .foreach(s => putStrLn(s))  
  .exitCode

Upvotes: 0

Related Questions