Reputation: 16253
The Akka HTTP client API allows passing a Source[ChunkStreamPart, Any]
to a HttpEntity.Chunked
, which makes it possible to push a stream of ByteString
s into a single HTTP request with backpressure handling:
val data: Source[ByteString, Future[ImportantInformation]]
val chunkedEntity = HttpEntity.Chunked(
ContentTypes.`application/octet-stream`,
data.map(ChunkStreamPart(_)))
val request = HttpRequest(HttpMethods.POST,
Uri("http://targethost/path"), entity = chunkedEntity)
val downstreamResp : Future[HttpResponse] = Http().singleRequest(request)
Now, the source is consumed far down in the transport layer, and I can't find a way to access the Future[ImportantInformation]
materialized value from my Source
. Is there a way to work around this problem, i.e. either a method that would let me access the materialized value, or even some kind of Sink
in the library that sinks a stream of ByteString
s into a single HTTP request?
Upvotes: 0
Views: 300
Reputation: 111
You can use mapMaterializedValue
on your source to access its materialized value.
val data: Source[ByteString, Future[ImportantInformation]]
val mappeddata =
data.mapMaterializedValue(future => processImportantInformation(future))
Upvotes: 2
Reputation: 17933
If you don't need to specify ImportantInformation
but just want to know when the Source
receives a termination message then you can use Source.watchTermination
. This will materialize a Future[Done]
.
There is a good example found here.
Upvotes: 1