Alberto Puritano
Alberto Puritano

Reputation: 21

How can I prevent the entity from being materialized more than once in Pekko/Akka?

I've got some issues with Pekko code and I believe it's related to the fact that the default Flow already materializes the data:

  def addSha(request: HttpRequest)(using
    as: ActorSystem[Any],
    ec: ExecutionContext
  ): Future[HttpResponse] =
    request.entity.dataBytes
      .via(computeHashWithPayloadAndPayloadLength)
      .map { out =>
        request
          .withEntity(out._2)
          .addHeader(new RawHeader("sha", out._1.digest().map("%02x".format(_)).mkString))

      }
      .via(Http().outgoingConnection)
      .runWith(Sink.head)

  private def computeHashWithPayloadAndPayloadLength: Flow[ByteString, (MessageDigest, ByteString, Int), NotUsed] =
    Flow[ByteString].fold((MessageDigest.getInstance("SHA-256"), ByteString.empty, 0)) { (acc, chunk) =>
      acc._1.update(chunk.toByteBuffer)
      (acc._1, acc._2 ++ chunk, acc._3 + chunk.length)
    }

Basically I need the request body in order to compute an hash and add it to the headers, forcing me to consume the source. If I comment this line //.withEntity(out._2) it returns the error:

substream source cannot be materialized more than once

because the flow I'm using is the default Pekko Http one (Http().outgoingConnection) and it seems to materialize the data. By using the .withEntity I'm creating another entity stream that can then be consumed another time.

Now onto my question: is there any way to solve this (maybe by using another pekko http flow) without having to re-implement the Http().outgoingConnection with the hash computing part?

Upvotes: 1

Views: 79

Answers (1)

earthling paul
earthling paul

Reputation: 558

There is an alternative way in pekko, this produces what you want:

object HttpRequestWithContentHashHeader extends App {
  implicit val system: ActorSystem = ActorSystem()

  import system.dispatcher
  implicit val http: HttpExt = Http(system)

  def computeHashWithPayloadAndPayloadLength: Flow[ByteString, (MessageDigest, ByteString, Int), NotUsed] =
    Flow[ByteString].fold((MessageDigest.getInstance("SHA-256"), ByteString.empty, 0)) { (acc, chunk) =>
      acc._1.update(chunk.toByteBuffer)
      (acc._1, acc._2 ++ chunk, acc._3 + chunk.length)
    }

  val request = HttpRequest(
    method = HttpMethods.POST,
    uri = "http://localhost:8080",
    entity = HttpEntity("payload")
  )

  val hashFuture = request.entity.dataBytes
    .via(computeHashWithPayloadAndPayloadLength)
    .runWith(Sink.head)
    .map { case (digest, _, _) =>
      RawHeader("X-Content-Hash", digest.digest().map("%02x".format(_)).mkString)
    }

   hashFuture.flatMap { hashHeader =>
     http.singleRequest(request.withHeaders(hashHeader))
  }
}

This approach does not use .via(Http().outgoingConnection) which is part of the low level Connection-Level Client-Side API. The doc there mentions that:

In almost all cases it is better to use the Http().singleRequest() API instead.

Calculating the hash on-the-fly like this only makes sense if you don't create the HTTP request yourself, eg in a proxy. If you have a plain HTTP client scenario and thus have access to the payload, you may calculate the hash prior to assembling the HTTP request.

Upvotes: 0

Related Questions