Reputation: 21547
I'm trying to send a query to the database with akka-streams' Tcp
client, but i don't understand what i am missing.
So i have two types Query
and Response
that are perfectly convertable to and from akka's ByteString
. So i'm creating a client connection with val conn = Tcp().outgoingConnection("localhost", 28015)
, which gives me a Flow[ByteString, ByteString, Future[OutgoingConnection]]
, so far so good. So i assume that source is my request with the query, i couldn't find the best way to feed this flow with a query source, but to construct it like Source(Future.successful(query))
, and connect it to the flow source.via(flow)
, which gives me another Source[Response, Unit]
. And here i can't understand how to get Future[Response]
, tried several combinators, but it gives me Materialized
value, which i'm not fully understand how it relates to the values/types in the flow.
Upvotes: 1
Views: 585
Reputation: 17923
You can use the join
method on a Flow. From the documentation:
Join this Flow to another Flow, by cross connecting the inputs and outputs, creating a RunnableGraph.
+------+ +-------+
| | ~Out~> | |
| this | | other |
| | <~In~ | |
+------+ +-------+
This allows you to connect the output from the connection to your Flow's input, and also connects your Flow's output to the connections input.
Specifically, you can take the Flow
generated from the outgoingConnection and join it with a Flow you created to respond to queries:
def queryDB(query : ByteString) : Future[ByteString] = ???
val concurrentQueries = 10
val queryResponder =
Flow[ByteString].mapAsync(concurrentQueries)(queryDB)
val server : String = ???
val port : Int = ???
//from the diagram above:
//this = connection
//other = queryResponder
Tcp().outgoingConnection(server, port).join(queryResponder).run()
Upvotes: 0
Reputation: 13130
Firstly: what kind of database is it, and why are you taking to it via TCP directly? Are you sure this will work how you indent it to work? Are you able to handle framing of the incoming responses?
As for your question about getting Future[Response]
out of a Source[Response, Unit]
, it's as simple as running the Source with a Sink.head
, i.e. like this: val res: Future[Result] = source.runWith(Sink.head)
(you need an implicit val mat = ActorMaterializer()
in scope of course).
I highly recommend you spend some time with the Akka Streams documentation before diving deeper into using Streams.
Upvotes: 1