Bill'o
Bill'o

Reputation: 514

Akka stream to actor issue

I am trying to stream an http response to an actor using Sink.actorRefWithAck but somehow nothing is sent to this sink actor but the init and complete msgs. I am probably missing something really obvious though. The sink actor is well being initiated. Here is the code for the streaming actor:

override def receive: Receive = {
    case GetTestData(p, id) =>
      // Get the data and pipes it to itself through a message as recommended
  
      http.singleRequest(HttpRequest(uri = uri.format(p, id)))
        .pipeTo(self)

    case HttpResponse(StatusCodes.OK, _, entity, _) =>
      // Sink as actorRef
      val sink = Sink.actorRefWithAck(context.actorOf(TestJob2.props(), "testJob2Actor"), Init, Ack, Complete)
      // Pipe the response body to the actor sink
      entity.dataBytes.runWith(sink)

    case resp @ HttpResponse(code, _, _, _) =>
      log.error("Request to test job failed, response code: " + code)
      // Discard the flow to avoid backpressure
      resp.discardEntityBytes()

    case _ => log.warning("Unexpected message in TestJobActor")
}

and here the code for the sink actor:

override def receive: Receive = {
  case Init =>
    log.info("TestJob2Actor got init sink message")
    sender ! Ack

  case Complete => log.info("TestJob2Actor got complete sink message")

  case b: ByteString =>
    log.debug(b.utf8String)
    sender ! Ack

  case _ => log.warning("Unexpected message in TestJob2Actor")
}

output:

2018-01-25 17:26:58,530 INFO com.mcma.actors.Supervisor akka.tcp://[email protected]:8000/user/supervisorActor - SupervisorActor forwarded GetTestData message to TestJobActor

2018-01-25 17:26:59,243 INFO com.mcma.actors.jobs.TestJob akka.tcp://[email protected]:8000/user/supervisorActor/testJobActor - TestJob actor started

2018-01-25 17:27:00,052 INFO com.mcma.actors.jobs.TestJob2 akka.tcp://[email protected]:8000/user/supervisorActor/testJobActor/testJob2Actor - TestJob2 actor started

2018-01-25 17:27:00,067 INFO com.mcma.actors.jobs.TestJob2 akka.tcp://[email protected]:8000/user/supervisorActor/testJobActor/testJob2Actor - TestJob2Actor got init sink message

2018-01-25 17:27:00,083 INFO com.mcma.actors.jobs.TestJob2 akka.tcp://[email protected]:8000/user/supervisorActor/testJobActor/testJob2Actor - TestJob2Actor got complete sink message

Upvotes: 2

Views: 625

Answers (1)

Jeffrey Chung
Jeffrey Chung

Reputation: 19527

Ideas: (1) the Source from entity.dataBytes could be empty, or (2) the sink actor is actually processing the ByteString chunks, but the logging level is such that debug messages are not visible. Try setting your logging level to DEBUG, or change log.debug(b.utf8String) to log.info(b.utf8String).

Upvotes: 4

Related Questions