Reputation: 6801
Using Akka 2.4.7. I would like to log the entire Http Response. Using an implementation similar to How does one log Akka HTTP client requests The code of concern is the one that extracts the data out of the HttpEntity
def entityAsString(entity: HttpEntity) (implicit m: Materializer, ex: ExecutionContext): Future[String] = {
entity.dataBytes.map(_.decodeString("UTF-8")).runWith(Sink.head)
}
This works well if the POST request has a small payload. But starting from 1K there is an exception:
java.lang.IllegalStateException: Substream Source cannot be materialized more than once
QUESTION: Why is this exception dependant on the size of the POST payload. And hopefully is there any possible fix?
Full log message:
2016-08-11 10:15:35,100 ERROR a.a.ActorSystemImpl [undefined]: Error during processing of request HttpRequest(HttpMethod(POST),http://localhost:3001/api/v2/exec,List(User-Agent: curl/7.30.0, Host: localhost:3001, Accept: */*, Expect: 100-continue, Timeout-Access: <function1>),HttpEntity.Default(multipart/form-data; boundary=-------------------------acebdf13572468; charset=UTF-8,5599,Source(SourceShape(StreamUtils$$anon$2.out), CompositeModule [2db5bfef]
Name: unnamed
Modules:
(unnamed) CompositeModule [4aac8b90]
Name: unnamed
Modules:
(SubSource%28EntitySource%29) GraphStage(EntitySource) [073d36ba]
(unnamed) [155dd7c9] copy of GraphStage(OneHundredContinueStage) [40b6c892]
(unnamed) [1b902132] copy of GraphStage(Collect) [75f65c1c]
(limitable) [76375468] copy of CompositeModule [59626a09]
Name: limitable
Modules:
(unnamed) GraphStage(unknown-operation) [1bee846d]
Downstreams:
Upstreams:
MatValue: Ignore
Downstreams:
SubSource.out -> GraphStage.in
GraphStage.out -> Collect.in
Collect.out -> unknown-operation.in
Upstreams:
GraphStage.in -> SubSource.out
Collect.in -> GraphStage.out
unknown-operation.in -> Collect.out
MatValue: Atomic(SubSource%28EntitySource%29[073d36ba])
(unnamed) [77d6c04c] copy of GraphStage(akka.http.impl.util.StreamUtils$$anon$2@30858cb0) [7e073049]
Downstreams:
SubSource.out -> GraphStage.in
GraphStage.out -> Collect.in
Collect.out -> unknown-operation.in
unknown-operation.out -> StreamUtils$$anon$2.in
Upstreams:
GraphStage.in -> SubSource.out
Collect.in -> GraphStage.out
unknown-operation.in -> Collect.out
StreamUtils$$anon$2.in -> unknown-operation.out
MatValue: Atomic(akka.stream.impl.StreamLayout$CompositeModule[4aac8b90]))),HttpProtocol(HTTP/1.1))
java.lang.IllegalStateException: Substream Source cannot be materialized more than once
at akka.stream.impl.fusing.SubSource$$anon$4.setCB(StreamOfStreams.scala:703)
at akka.stream.impl.fusing.SubSource$$anon$4.preStart(StreamOfStreams.scala:713)
at akka.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:475)
at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:380)
at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:538)
at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:586)
at akka.actor.Actor$class.aroundPreStart(Actor.scala:489)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:529)
at akka.actor.ActorCell.create(ActorCell.scala:590)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Upvotes: 2
Views: 1405
Reputation: 2451
I found the questions still relevant for Akka-http 2.6.4, and after looking at some bug reports, this post in particular help me to find the solution https://github.com/akka/akka-http/issues/73.
But, it is mentioned in the reference above, it implies that the content of the file is stored in memory instead of using stream. So, I would consider this a workaround rather than a solution.
Note also, that I didn't found any different in behaviour between fileUpload
and storeUploadedFile
. This work around works for both.
WORKAROUND: here are my functions examples
def createTestUploadWithStrict = toStrictEntity(3.seconds) {
(withoutSizeLimit &
post &
pathPrefix("test") &
fileUpload("data") &
formField("f1".as[MyCustomFormFiled])){
case ((metadata: FileInfo, fileStream: Source[ByteString, Any]), di:MyCustomFormFiled) => {
// Save the file
val file = tempDestination(metadata)
val sink = FileIO.toPath(file.toPath)
val writeResultFut = fileStream.runWith(FileIO.toPath(file.toPath))
val result = ???
// file is written to file
onComplete(writeResultFut) {
case Success(_) =>
complete(200 -> s"Working fine with data $result")
case Failure(e) =>
complete(500 -> s"Error while writing data file: $e")
}
}
}
}
And using storeUploadedFile
:
def createTestUploadWithStrict = toStrictEntity(3.seconds) {
(withoutSizeLimit &
post &
pathPrefix("test2") &
storeUploadedFile("data", tempDestination) &
formField("device".as[MyCustomFormFiled])){
(metadata: FileInfo, file: File, di:MyCustomFormFiled) =>
val result = ???
complete(200 -> s"Working fine with data $result")
}
}
Note that the Unmarshaller is provided implicitly with:
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import spray.json._
trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol{
implicit val mycustomFormFieldFormat = jsonFormat2(MyCustomFormFiled) }
Upvotes: 0
Reputation: 1563
I assume that entity.dataBytes
is already used for some useful purpose before calling this entityAsString
, or entityAsString
is called twice. In general case, contents of HttpEntity
can't be reused. However, contents of HttpEntity.Strict
can be reused.
Upvotes: 0