Polymerase
Polymerase

Reputation: 6801

Akka-Stream, logging, materialized flow failed if entity size > 1K

Using Akka 2.4.7. I would like to log the entire Http Response. Using an implementation similar to How does one log Akka HTTP client requests The code of concern is the one that extracts the data out of the HttpEntity

def entityAsString(entity: HttpEntity) (implicit m: Materializer, ex: ExecutionContext): Future[String] = {
    entity.dataBytes.map(_.decodeString("UTF-8")).runWith(Sink.head)
}

This works well if the POST request has a small payload. But starting from 1K there is an exception:

java.lang.IllegalStateException: Substream Source cannot be materialized more than once

QUESTION: Why is this exception dependant on the size of the POST payload. And hopefully is there any possible fix?

Full log message:

2016-08-11 10:15:35,100 ERROR a.a.ActorSystemImpl [undefined]: Error during processing of request HttpRequest(HttpMethod(POST),http://localhost:3001/api/v2/exec,List(User-Agent: curl/7.30.0, Host: localhost:3001, Accept: */*, Expect: 100-continue, Timeout-Access: <function1>),HttpEntity.Default(multipart/form-data; boundary=-------------------------acebdf13572468; charset=UTF-8,5599,Source(SourceShape(StreamUtils$$anon$2.out), CompositeModule [2db5bfef]  
  Name: unnamed  
  Modules:  
    (unnamed) CompositeModule [4aac8b90]  
      Name: unnamed  
      Modules:  
        (SubSource%28EntitySource%29) GraphStage(EntitySource) [073d36ba]  
        (unnamed) [155dd7c9] copy of GraphStage(OneHundredContinueStage) [40b6c892]  
        (unnamed) [1b902132] copy of GraphStage(Collect) [75f65c1c]  
        (limitable) [76375468] copy of CompositeModule [59626a09]  
          Name: limitable  
          Modules:  
            (unnamed) GraphStage(unknown-operation) [1bee846d]  
          Downstreams:   
          Upstreams:   
          MatValue: Ignore  
      Downstreams:   
        SubSource.out -> GraphStage.in  
        GraphStage.out -> Collect.in  
        Collect.out -> unknown-operation.in  
      Upstreams:   
        GraphStage.in -> SubSource.out  
        Collect.in -> GraphStage.out  
        unknown-operation.in -> Collect.out  
      MatValue: Atomic(SubSource%28EntitySource%29[073d36ba])  
    (unnamed) [77d6c04c] copy of GraphStage(akka.http.impl.util.StreamUtils$$anon$2@30858cb0) [7e073049]  
  Downstreams:   
    SubSource.out -> GraphStage.in  
    GraphStage.out -> Collect.in  
    Collect.out -> unknown-operation.in  
    unknown-operation.out -> StreamUtils$$anon$2.in  
  Upstreams:   
    GraphStage.in -> SubSource.out  
    Collect.in -> GraphStage.out  
    unknown-operation.in -> Collect.out  
    StreamUtils$$anon$2.in -> unknown-operation.out  
  MatValue: Atomic(akka.stream.impl.StreamLayout$CompositeModule[4aac8b90]))),HttpProtocol(HTTP/1.1))  
java.lang.IllegalStateException: Substream Source cannot be materialized more than once  
    at akka.stream.impl.fusing.SubSource$$anon$4.setCB(StreamOfStreams.scala:703)  
    at akka.stream.impl.fusing.SubSource$$anon$4.preStart(StreamOfStreams.scala:713)  
    at akka.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:475)  
    at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:380)  
    at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:538)  
    at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:586)  
    at akka.actor.Actor$class.aroundPreStart(Actor.scala:489)  
    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:529)  
    at akka.actor.ActorCell.create(ActorCell.scala:590)  
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)  
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)  
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)  
    at akka.dispatch.Mailbox.run(Mailbox.scala:223)  
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)  
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)  
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)  
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)  
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)  

Upvotes: 2

Views: 1405

Answers (2)

aitorhh
aitorhh

Reputation: 2451

I found the questions still relevant for Akka-http 2.6.4, and after looking at some bug reports, this post in particular help me to find the solution https://github.com/akka/akka-http/issues/73.

But, it is mentioned in the reference above, it implies that the content of the file is stored in memory instead of using stream. So, I would consider this a workaround rather than a solution.

Note also, that I didn't found any different in behaviour between fileUpload and storeUploadedFile. This work around works for both.

WORKAROUND: here are my functions examples

def createTestUploadWithStrict = toStrictEntity(3.seconds) {
    (withoutSizeLimit & 
     post & 
     pathPrefix("test") & 
     fileUpload("data") & 
     formField("f1".as[MyCustomFormFiled])){ 
         case ((metadata: FileInfo, fileStream: Source[ByteString, Any]), di:MyCustomFormFiled) => {

    // Save the file
    val file = tempDestination(metadata)
    val sink = FileIO.toPath(file.toPath)
    val writeResultFut = fileStream.runWith(FileIO.toPath(file.toPath))
    val result = ???

    // file is written to file
    onComplete(writeResultFut) {
        case Success(_) =>
          complete(200 -> s"Working fine with data $result")
        case Failure(e) =>
          complete(500 -> s"Error while writing data file: $e")
      }
    }
  }
}

And using storeUploadedFile:

 def createTestUploadWithStrict = toStrictEntity(3.seconds) {
    (withoutSizeLimit & 
     post & 
     pathPrefix("test2") & 
     storeUploadedFile("data", tempDestination) & 
     formField("device".as[MyCustomFormFiled])){ 
(metadata: FileInfo, file: File, di:MyCustomFormFiled) =>
    val result = ???
    complete(200 -> s"Working fine with data $result")
    }
  }

Note that the Unmarshaller is provided implicitly with:

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport                                                                                                                      
import spray.json._  

trait JsonSupport extends SprayJsonSupport with  DefaultJsonProtocol{
    implicit val mycustomFormFieldFormat = jsonFormat2(MyCustomFormFiled)                                                                                                                        }  

Upvotes: 0

Konstantin Pelepelin
Konstantin Pelepelin

Reputation: 1563

I assume that entity.dataBytes is already used for some useful purpose before calling this entityAsString, or entityAsString is called twice. In general case, contents of HttpEntity can't be reused. However, contents of HttpEntity.Strict can be reused.

Upvotes: 0

Related Questions