Reputation: 122
I'm trying to use akka-streams and akka-http in order to solve the following problem:
I come up with the following code in order to get incoming connections merged to one output:
val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit b: FlowGraph.Builder[Unit] =>
import FlowGraph.Implicits._
val merge = b.add(MergePreferred[IncomingConnection](1))
val inA: Source[IncomingConnection, Future[ServerBinding]] = Http().bind(interface = "localhost", port = 8200)
val inB: Source[IncomingConnection, Future[ServerBinding]] = Http().bind(interface = "localhost", port = 8201)
inA ~> merge.preferred
inB ~> merge.in(0)
merge.out ~> Sink.foreach(println)
ClosedShape
}).run()
So, I have a Source with IncomingConnection inctances from A and B.
Now I want to process them somehow, produce responses and send responses to corresponding connections.
Maybe there are better ways to archive all these things but I could not find any example solving such problem in docs or questions from other people.
Also I guess the problem is quite common.
Thanks in advance for your help.
Upvotes: 0
Views: 520
Reputation: 17973
The way IncomingConnection
objects are used to process requests and return responses is with the handleXXX
methods. Examples based on the documentation and the api:
Synchronous Function
//this processor contains your logic for converting a request into a response
def requestProcessor(httpRequest : HttpRequest) : HttpResponse = ???
val connectionSink = Sink.foreach[IncomingConnection] { conn =>
conn.handleWithSyncHandler(requestProcessor)
}
This connectionSink
can then be used in your graph construct:
inA ~> merge.preferred
inB ~> merge.in(0)
merge.out ~> connectionSink
Asynchronous Function
Similarly, if your processor is asynchronous:
val asyncReqProcessor(httpRequest : HttpRequest) : Future[HttpResponse] = ???
val connectionSink =
Sink.foreach[IncomingConnection](_ handleWithAsyncHandler asyncProcessor)
Flow
And finally, you can also use a Flow (my preferred method):
val flowReqProcessor : Flow[HttpRequest, HttpResponse,_] = ???
val connectionSink =
Sink.foreach[IncomingConnection](_ handleWith flowReqProcessor )
One of my favorite features about Flows is that you can re-use them over & over in different Streams. Therefore, flowReqProcessor can be a val
instead of a def
.
Upvotes: 1