4lex1v
4lex1v

Reputation: 21547

Tcp request to the database with akka streams

I'm trying to send a query to the database with akka-streams' Tcp client, but i don't understand what i am missing.

So i have two types Query and Response that are perfectly convertable to and from akka's ByteString. So i'm creating a client connection with val conn = Tcp().outgoingConnection("localhost", 28015), which gives me a Flow[ByteString, ByteString, Future[OutgoingConnection]], so far so good. So i assume that source is my request with the query, i couldn't find the best way to feed this flow with a query source, but to construct it like Source(Future.successful(query)), and connect it to the flow source.via(flow), which gives me another Source[Response, Unit]. And here i can't understand how to get Future[Response], tried several combinators, but it gives me Materialized value, which i'm not fully understand how it relates to the values/types in the flow.

Upvotes: 1

Views: 585

Answers (2)

You can use the join method on a Flow. From the documentation:

Join this Flow to another Flow, by cross connecting the inputs and outputs, creating a RunnableGraph.

+------+        +-------+
|      | ~Out~> |       |
| this |        | other |
|      | <~In~  |       |
+------+        +-------+

This allows you to connect the output from the connection to your Flow's input, and also connects your Flow's output to the connections input.

Specifically, you can take the Flow generated from the outgoingConnection and join it with a Flow you created to respond to queries:

def queryDB(query : ByteString) : Future[ByteString] = ???

val concurrentQueries = 10

val queryResponder = 
  Flow[ByteString].mapAsync(concurrentQueries)(queryDB)

val server : String = ???
val port : Int = ???

//from the diagram above:
//this = connection
//other = queryResponder
Tcp().outgoingConnection(server, port).join(queryResponder).run()

Upvotes: 0

Firstly: what kind of database is it, and why are you taking to it via TCP directly? Are you sure this will work how you indent it to work? Are you able to handle framing of the incoming responses?

As for your question about getting Future[Response] out of a Source[Response, Unit], it's as simple as running the Source with a Sink.head, i.e. like this: val res: Future[Result] = source.runWith(Sink.head) (you need an implicit val mat = ActorMaterializer() in scope of course).

I highly recommend you spend some time with the Akka Streams documentation before diving deeper into using Streams.

Upvotes: 1

Related Questions