Martijn
Martijn

Reputation: 2326

Akka Streams with Akka HTTP Server and Client

I'm trying to create an endpoint on my Akka Http Server which tells the users it's IP address using an external service (I know this can be performed way easier but I'm doing this as a challenge).

The code that doesn't make use of streams on the upper most layer is this:

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val requestHandler: HttpRequest => Future[HttpResponse] = {
  case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
    Http().singleRequest(HttpRequest(GET, Uri("http://checkip.amazonaws.com/"))).flatMap { response =>
      response.entity.dataBytes.runFold(ByteString(""))(_ ++ _) map { string =>
        HttpResponse(entity = HttpEntity(MediaTypes.`text/html`,
          "<html><body><h1>" + string.utf8String + "</h1></body></html>"))
      }
    }

  case _: HttpRequest =>
    Future(HttpResponse(404, entity = "Unknown resource!"))
}

Http().bindAndHandleAsync(requestHandler, "localhost", 8080)

and it is working fine. However, as a challenge, I wanted to limit myself to only using streams (no Future's).

This is the layout I thought I'd use for this kind of an approach: Source[Request] -> Flow[Request, Request] -> Flow[Request, Response] ->Flow[Response, Response] and to accommodate the 404 route, also Source[Request] -> Flow[Request, Response]. Now, if my Akka Stream knowledge serves me well, I need to use a Flow.fromGraph for such a thing, however, this is where I'm stuck.

In a Future I can do an easy map and flatMap for the various endpoints but in streams that would mean dividing up the Flow into multiple Flow's and I'm not quite sure how I'd do that. I thought about using UnzipWith and Options or a generic Broadcast.

Any help on this subject would be much appreciated.


I don't if this would be necessary? -- http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0-M2/scala/stream-customize.html

Upvotes: 3

Views: 2088

Answers (1)

You do not need to use Flow.fromGraph. Instead, a singular Flow that uses flatMapConcat will work:

//an outgoing connection flow
val checkIPFlow = Http().outgoingConnection("checkip.amazonaws.com")

//converts the final html String to an HttpResponse
def byteStrToResponse(byteStr : ByteString) = 
  HttpResponse(entity = new Default(MediaTypes.`text/html`,
                                    byteStr.length,
                                    Source.single(byteStr)))

val reqResponseFlow = Flow[HttpRequest].flatMapConcat[HttpResponse]( _ match {
  case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
    Source.single(HttpRequest(GET, Uri("http://checkip.amazonaws.com/")))
          .via(checkIPFlow)
          .mapAsync(1)(_.entity.dataBytes.runFold(ByteString(""))(_ ++ _))
          .map("<html><body><h1>" + _.utf8String + "</h1></body></html>")
          .map(ByteString.apply)
          .map(byteStrToResponse)

  case _ =>
    Source.single(HttpResponse(404, entity = "Unknown resource!"))    
})

This Flow can then be used to bind to incoming requests:

Http().bindAndHandle(reqResponseFlow, "localhost", 8080)

And all without Futures...

Upvotes: 5

Related Questions