Reputation: 21
I've got some issues with Pekko code and I believe it's related to the fact that the default Flow already materializes the data:
def addSha(request: HttpRequest)(using
as: ActorSystem[Any],
ec: ExecutionContext
): Future[HttpResponse] =
request.entity.dataBytes
.via(computeHashWithPayloadAndPayloadLength)
.map { out =>
request
.withEntity(out._2)
.addHeader(new RawHeader("sha", out._1.digest().map("%02x".format(_)).mkString))
}
.via(Http().outgoingConnection)
.runWith(Sink.head)
private def computeHashWithPayloadAndPayloadLength: Flow[ByteString, (MessageDigest, ByteString, Int), NotUsed] =
Flow[ByteString].fold((MessageDigest.getInstance("SHA-256"), ByteString.empty, 0)) { (acc, chunk) =>
acc._1.update(chunk.toByteBuffer)
(acc._1, acc._2 ++ chunk, acc._3 + chunk.length)
}
Basically I need the request body in order to compute an hash and add it to the headers, forcing me to consume the source. If I comment this line
//.withEntity(out._2)
it returns the error:
substream source cannot be materialized more than once
because the flow I'm using is the default Pekko Http one (Http().outgoingConnection) and it seems to materialize the data. By using the .withEntity I'm creating another entity stream that can then be consumed another time.
Now onto my question: is there any way to solve this (maybe by using another pekko http flow) without having to re-implement the Http().outgoingConnection with the hash computing part?
Upvotes: 1
Views: 79
Reputation: 558
There is an alternative way in pekko, this produces what you want:
object HttpRequestWithContentHashHeader extends App {
implicit val system: ActorSystem = ActorSystem()
import system.dispatcher
implicit val http: HttpExt = Http(system)
def computeHashWithPayloadAndPayloadLength: Flow[ByteString, (MessageDigest, ByteString, Int), NotUsed] =
Flow[ByteString].fold((MessageDigest.getInstance("SHA-256"), ByteString.empty, 0)) { (acc, chunk) =>
acc._1.update(chunk.toByteBuffer)
(acc._1, acc._2 ++ chunk, acc._3 + chunk.length)
}
val request = HttpRequest(
method = HttpMethods.POST,
uri = "http://localhost:8080",
entity = HttpEntity("payload")
)
val hashFuture = request.entity.dataBytes
.via(computeHashWithPayloadAndPayloadLength)
.runWith(Sink.head)
.map { case (digest, _, _) =>
RawHeader("X-Content-Hash", digest.digest().map("%02x".format(_)).mkString)
}
hashFuture.flatMap { hashHeader =>
http.singleRequest(request.withHeaders(hashHeader))
}
}
This approach does not use .via(Http().outgoingConnection)
which is part of the low level Connection-Level Client-Side API. The doc there mentions that:
In almost all cases it is better to use the Http().singleRequest()
API instead.
Calculating the hash on-the-fly like this only makes sense if you don't create the HTTP request yourself, eg in a proxy. If you have a plain HTTP client scenario and thus have access to the payload, you may calculate the hash prior to assembling the HTTP request.
Upvotes: 0