Reputation: 514
I am trying to stream an http response to an actor using Sink.actorRefWithAck but somehow nothing is sent to this sink actor but the init and complete msgs. I am probably missing something really obvious though. The sink actor is well being initiated. Here is the code for the streaming actor:
override def receive: Receive = {
case GetTestData(p, id) =>
// Get the data and pipes it to itself through a message as recommended
http.singleRequest(HttpRequest(uri = uri.format(p, id)))
.pipeTo(self)
case HttpResponse(StatusCodes.OK, _, entity, _) =>
// Sink as actorRef
val sink = Sink.actorRefWithAck(context.actorOf(TestJob2.props(), "testJob2Actor"), Init, Ack, Complete)
// Pipe the response body to the actor sink
entity.dataBytes.runWith(sink)
case resp @ HttpResponse(code, _, _, _) =>
log.error("Request to test job failed, response code: " + code)
// Discard the flow to avoid backpressure
resp.discardEntityBytes()
case _ => log.warning("Unexpected message in TestJobActor")
}
and here the code for the sink actor:
override def receive: Receive = {
case Init =>
log.info("TestJob2Actor got init sink message")
sender ! Ack
case Complete => log.info("TestJob2Actor got complete sink message")
case b: ByteString =>
log.debug(b.utf8String)
sender ! Ack
case _ => log.warning("Unexpected message in TestJob2Actor")
}
output:
2018-01-25 17:26:58,530 INFO com.mcma.actors.Supervisor akka.tcp://[email protected]:8000/user/supervisorActor - SupervisorActor forwarded GetTestData message to TestJobActor
2018-01-25 17:26:59,243 INFO com.mcma.actors.jobs.TestJob akka.tcp://[email protected]:8000/user/supervisorActor/testJobActor - TestJob actor started
2018-01-25 17:27:00,052 INFO com.mcma.actors.jobs.TestJob2 akka.tcp://[email protected]:8000/user/supervisorActor/testJobActor/testJob2Actor - TestJob2 actor started
2018-01-25 17:27:00,067 INFO com.mcma.actors.jobs.TestJob2 akka.tcp://[email protected]:8000/user/supervisorActor/testJobActor/testJob2Actor - TestJob2Actor got init sink message
2018-01-25 17:27:00,083 INFO com.mcma.actors.jobs.TestJob2 akka.tcp://[email protected]:8000/user/supervisorActor/testJobActor/testJob2Actor - TestJob2Actor got complete sink message
Upvotes: 2
Views: 625
Reputation: 19527
Ideas: (1) the Source
from entity.dataBytes
could be empty, or (2) the sink actor is actually processing the ByteString
chunks, but the logging level is such that debug messages are not visible. Try setting your logging level to DEBUG, or change log.debug(b.utf8String)
to log.info(b.utf8String)
.
Upvotes: 4