us2012
us2012

Reputation: 16253

Keeping the materialized value of a source passed to a chunked request entity (client-side)

The Akka HTTP client API allows passing a Source[ChunkStreamPart, Any] to a HttpEntity.Chunked, which makes it possible to push a stream of ByteStrings into a single HTTP request with backpressure handling:

val data: Source[ByteString, Future[ImportantInformation]]
val chunkedEntity = HttpEntity.Chunked(
    ContentTypes.`application/octet-stream`, 
    data.map(ChunkStreamPart(_)))
val request = HttpRequest(HttpMethods.POST,
    Uri("http://targethost/path"), entity = chunkedEntity)
val downstreamResp : Future[HttpResponse] = Http().singleRequest(request)

Now, the source is consumed far down in the transport layer, and I can't find a way to access the Future[ImportantInformation] materialized value from my Source. Is there a way to work around this problem, i.e. either a method that would let me access the materialized value, or even some kind of Sink in the library that sinks a stream of ByteStrings into a single HTTP request?

Upvotes: 0

Views: 300

Answers (2)

Pahomov Dmitry
Pahomov Dmitry

Reputation: 111

You can use mapMaterializedValue on your source to access its materialized value.

val data: Source[ByteString, Future[ImportantInformation]]
val mappeddata = 
  data.mapMaterializedValue(future => processImportantInformation(future))

Upvotes: 2

If you don't need to specify ImportantInformation but just want to know when the Source receives a termination message then you can use Source.watchTermination. This will materialize a Future[Done].

There is a good example found here.

Upvotes: 1

Related Questions