d-n-ust
d-n-ust

Reputation: 122

Akka-http merge incoming connections

I'm trying to use akka-streams and akka-http in order to solve the following problem:

I come up with the following code in order to get incoming connections merged to one output:

val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit b: FlowGraph.Builder[Unit] =>
    import FlowGraph.Implicits._

    val merge = b.add(MergePreferred[IncomingConnection](1))

    val inA: Source[IncomingConnection, Future[ServerBinding]] = Http().bind(interface = "localhost", port = 8200)
    val inB: Source[IncomingConnection, Future[ServerBinding]] = Http().bind(interface = "localhost", port = 8201)

    inA ~> merge.preferred
    inB ~> merge.in(0)
           merge.out ~> Sink.foreach(println)

    ClosedShape
}).run()

So, I have a Source with IncomingConnection inctances from A and B.

Now I want to process them somehow, produce responses and send responses to corresponding connections.

Maybe there are better ways to archive all these things but I could not find any example solving such problem in docs or questions from other people.

Also I guess the problem is quite common.

Thanks in advance for your help.

Upvotes: 0

Views: 520

Answers (1)

The way IncomingConnection objects are used to process requests and return responses is with the handleXXX methods. Examples based on the documentation and the api:

Synchronous Function

//this processor contains your logic for converting a request into a response
def requestProcessor(httpRequest : HttpRequest) : HttpResponse = ???

val connectionSink = Sink.foreach[IncomingConnection] { conn =>
  conn.handleWithSyncHandler(requestProcessor)
}

This connectionSink can then be used in your graph construct:

inA ~> merge.preferred
inB ~> merge.in(0)
       merge.out ~> connectionSink

Asynchronous Function

Similarly, if your processor is asynchronous:

val asyncReqProcessor(httpRequest : HttpRequest) : Future[HttpResponse] = ???

val connectionSink = 
  Sink.foreach[IncomingConnection](_ handleWithAsyncHandler asyncProcessor)

Flow

And finally, you can also use a Flow (my preferred method):

val flowReqProcessor : Flow[HttpRequest, HttpResponse,_] = ???

val connectionSink = 
  Sink.foreach[IncomingConnection](_ handleWith flowReqProcessor )

One of my favorite features about Flows is that you can re-use them over & over in different Streams. Therefore, flowReqProcessor can be a val instead of a def.

Upvotes: 1

Related Questions