Reputation: 45
I'm quite new to using ZIO. I'm currently writing a crypto trading bot in Scala and I am trying to learn ZIO at the same time. Now I'm opening a websocket, this websocket gives multiple callbacks until it is closed which I'm struggling to integrate in my code. My current code:
object Main extends zio.App with Logging {
def run(args: List[String]): URIO[Any with Console, ExitCode] = Configuration.getConfiguration.fold(onError, start).exitCode
private val interval: CandlestickInterval = CandlestickInterval.ONE_MINUTE
private def onError(exception: ConfigurationException): ZIO[Any, Throwable, Unit] = {
logger.info("Could not initialize traderbot!")
logger.error(exception.getMessage)
IO.succeed()
}
private final def start(configuration: Configuration): ZIO[Any, Throwable, Unit] = {
for {
binanceClient <- IO.succeed(BinanceApiClientFactory.newInstance(configuration.apiKey, configuration.secret))
webSocketClient <- IO.succeed(binanceClient.newWebSocketClient())
candlesticks <- Task.effectAsync[CandlestickEvent] {
callback =>
webSocketClient.onCandlestickEvent(
"adaeur",
interval, d => callback(IO.succeed(d))
)
})
// TODO Calculate RSI from candlesticks.
} yield candlesticks
}
}
I would like keep receiving candlestick events and keep things functional. I saw a few things about Zio Streams, but I can't find examples that deal with recurring callbacks and are simple to understand. Now I can't use my Candlestick code to in de for comprehension.
Thanks for your time!
Upvotes: 1
Views: 607
Reputation: 18663
Unfortunately, ZIO
can't handle multiple callbacks when using effectAsync
since the data type is based on a single success or failure value.
You can use ZStream
instead though which has a similarly shaped operator which can be called multiple times:
private final def start(configuration: Configuration): ZStream[Any, Throwable, Unit] = {
val candlesticks = ZStream.unwrap(
IO.effectTotal {
val client = BinanceApiClientFactory
.newInstance(configuration.apiKey, configuration.secret)
.newWebSocketClient()
// This variant accepts a return value in the `Left` which
// is called when during shutdown to make sure that the websocket is
// cleaned up
ZStream.effectAsyncInterrupt { cb =>
val closeable = webSocketClient.onCancelstickEvent(
"adaeur",
interval,
d => cb(IO.succeed(d)
)
Left(UIO(closeable.close()))
}
)
for {
candlestick <- candlesticks
// TODO Calculate RSI from candlesticks.
} yield ()
}
Upvotes: 2