Reputation: 18639
I have the following code snippet:
case class SomeClass(param1:String,param2:String,param3:String)
val someClassActorSource: Source[SomeClass, ActorRef] = Source
.actorPublisher[SomeClass](Props[SomeClassActorPublisher])
val someFlow: ActorRef = Flow[SomeClass]
.mapAsync(3)(f=> getDocumentById(f))
.map(f =>{
val request = HttpRequest(method = HttpMethods.POST, uri = "http://localhost:8000/test")
.withEntity(ContentTypes.`text/xml(UTF-8)`, ByteString(f.a)
)
(request,request)
}).via(connection)
//Parsing Response
.mapAsync(3){
case (Success(HttpResponse(status, _, entity, _)),request)=>
entity.dataBytes.runFold(ByteString(""))(_ ++ _)
}
.map(resp =>parse(resp.utf8String,?????????????) )
.to(Sink.someSink{....})
.runWith(someClassActorSource)
def parse(resp:String,parseParam:String)=????
and somewhere in the code I'm sending message to Flow:
someflow ! SomeClass("a","b","c")
someflow ! SomeClass("a1","b1","c1")
My problem is that method parse should use param2 from original case class
So for first message it should be
parse(response,"b")
and for second message it should be
parse(response,"b1")
So the question is, how can I fetch a parameter from the method I submitted to the flow?
Upvotes: 0
Views: 854
Reputation: 17923
Assuming your connection
value is being instantiated via
val connection = Http().cachedHostConnectionPool(...)
You can use the fact that the Connection takes in a tuple and instead of simply passing the request
twice in the tuple you can pass in the inputted SomeClass
. This SomeClass
instance will have to go through each of your Flow
values to make it to the parsing stage.
Modifying your code a bit:
val getDocumentFlow =
Flow[SomeClass].mapAsync(3)(f => getSomDocumentById(f).map( d => d -> f))
Your question doesn't state the return type from getDocumentById
so I'm just using Document
:
val documentToRequest =
Flow[(Document, SomeClass)] map { case (document, someClass) =>
val request = ...
(request, someClass)
}
val parseResponse =
Flow[(Try[HttpResponse], SomeClass)].mapAsync(3){
case (Success(HttpResponse(status, _, entity, _)), someClass) =>
entity
.dataBytes
.runFold(ByteString(""))(_ ++ _)
.map(e => e -> someClass)
}
val parseEntity = Flow[(ByteString, SomeClass)] map {
case (entity, someClass) => parse(entity.utf8String, someClass)
}
These flows can then be used as described in the question:
val someFlow =
someClassActorSource
.via(getDocumentFlow)
.via(documentToRequest)
.via(connection)
.via(parseResponse)
.via(parseEntity)
.to(Sink.someSink{...})
.run()
Upvotes: 1