Heiko
Heiko

Reputation: 45

ZIO and multiple callbacks

I'm quite new to using ZIO. I'm currently writing a crypto trading bot in Scala and I am trying to learn ZIO at the same time. Now I'm opening a websocket, this websocket gives multiple callbacks until it is closed which I'm struggling to integrate in my code. My current code:

object Main extends zio.App with Logging {
   def run(args: List[String]): URIO[Any with Console, ExitCode] = Configuration.getConfiguration.fold(onError, start).exitCode

   private val interval: CandlestickInterval = CandlestickInterval.ONE_MINUTE

   private def onError(exception: ConfigurationException): ZIO[Any, Throwable, Unit]  = {
     logger.info("Could not initialize traderbot!")
     logger.error(exception.getMessage)
     IO.succeed()
   }

   private final def start(configuration: Configuration): ZIO[Any, Throwable, Unit] = {
      for {
        binanceClient <- IO.succeed(BinanceApiClientFactory.newInstance(configuration.apiKey, configuration.secret))
        webSocketClient <- IO.succeed(binanceClient.newWebSocketClient())
        candlesticks <- Task.effectAsync[CandlestickEvent] {
          callback =>
            webSocketClient.onCandlestickEvent(
            "adaeur",
            interval, d => callback(IO.succeed(d))
          )
        })
        // TODO Calculate RSI from candlesticks.
   } yield candlesticks
 }
}

I would like keep receiving candlestick events and keep things functional. I saw a few things about Zio Streams, but I can't find examples that deal with recurring callbacks and are simple to understand. Now I can't use my Candlestick code to in de for comprehension.

Thanks for your time!

Upvotes: 1

Views: 607

Answers (1)

paulpdaniels
paulpdaniels

Reputation: 18663

Unfortunately, ZIO can't handle multiple callbacks when using effectAsync since the data type is based on a single success or failure value.

You can use ZStream instead though which has a similarly shaped operator which can be called multiple times:

private final def start(configuration: Configuration): ZStream[Any, Throwable, Unit] = {
  val candlesticks = ZStream.unwrap(
    IO.effectTotal {
      val client = BinanceApiClientFactory
        .newInstance(configuration.apiKey, configuration.secret)
        .newWebSocketClient()

      // This variant accepts a return value in the `Left` which 
      // is called when during shutdown to make sure that the websocket is 
      // cleaned up
      ZStream.effectAsyncInterrupt { cb => 
        val closeable = webSocketClient.onCancelstickEvent(
          "adaeur",
          interval,
          d => cb(IO.succeed(d)
        ) 

        Left(UIO(closeable.close()))
      }
  )

  for {
    candlestick <- candlesticks
    // TODO Calculate RSI from candlesticks.
  } yield ()
}

Upvotes: 2

Related Questions