Reputation: 1462
I want to stream data from Zstream
with some repeat time. I have my main function which returns ZIO
:
def processData(request: MyRequest): Task[Seq[SomePayload]]
I also call this method for every element in my list requests
.
Now I would like to take this output and stream jsons I have in Seq[SomePayload]
. I created a stream with endpoint:
val streamingServerEndpoint: ZServerEndpoint[Any, ZioStreams] = streamingEndpoint.zServerLogic { _ =>
val stream =
ZStream.fromZIO(ZIO.collectAll(requests.collect(service.processData(_))).repeat(Schedule.minuteOfHour(30))).map(_.toByte)
ZIO.succeed((100L, stream))
}
Endpoint:
val streamingEndpoint: PublicEndpoint[Unit, Unit, (Long, Stream[Throwable, Byte]), ZioStreams] =
endpoint.get
.in("receive")
.out(header[Long](HeaderNames.ContentLength))
.out(streamTextBody(ZioStreams)(CodecFormat.TextPlain(), Some(StandardCharsets.UTF_8)))
But when I run this code, I can make a request to /receive
endpoint but nothing is streamed in output. In logs I see processData
method works and it returns data, but nothing on endpoint. How should I change this code to stream jsons (or jsons as bytes) on API?
I want also to run processData
for all requests, every hour (minut of 30 every hour).
Upvotes: 0
Views: 456