Vuzi
Vuzi

Reputation: 115

Akka stream - Splitting a stream of ByteString into multiple files

I'm trying to split an incoming Akka stream of bytes (from the body of an http request, but it could also be from a file) into multiple files of a defined size.

For example, if I'm uploading a 10Gb file, it would create something like 10 files of 1Gb. The files would have randomly generated names. My issue is that I don't really know where to start, because all the responses and examples I've read are either storing the whole chunk into memory, or using a delimiter based on a string. Except I can't really have "chunks" of 1Gb, and then just write them to the disk..

Is there any easy way to perform that kind of operation ? My only idea would be to use something like this http://doc.akka.io/docs/akka/2.4/scala/stream/stream-cookbook.html#Chunking_up_a_stream_of_ByteStrings_into_limited_size_ByteStrings but transformed to something like FlowShape[ByteString, File], writting myself into a file the chunks until the max file size is reached, then creating a new file, etc.., and streaming back the created files. Which looks like an atrocious idea not using properly Akka..

Thanks in advance

Upvotes: 4

Views: 3152

Answers (3)

Jeffrey Chung
Jeffrey Chung

Reputation: 19527

The idiomatic way to split a ByteString stream to multiple files is to use Alpakka's LogRotatorSink. From the documentation:

This sink will takes a function as parameter which returns a Bytestring => Option[Path] function. If the generated function returns a path the sink will rotate the file output to this new path and the actual ByteString will be written to this new file too. With this approach the user can define a custom stateful file generation implementation.

The following fileSizeRotationFunction is also from the documentation:

val fileSizeRotationFunction = () => {
  val max = 10 * 1024 * 1024
  var size: Long = max
  (element: ByteString) =>
    {
      if (size + element.size > max) {
        val path = Files.createTempFile("out-", ".log")
        size = element.size
        Some(path)
      } else {
        size += element.size
        None
      }
    }
}

An example of its use:

val source: Source[ByteString, _] = ???
source.runWith(LogRotatorSink(fileSizeRotationFunction))

Upvotes: 1

Aditya Goyal
Aditya Goyal

Reputation: 13

You could write a custom graph stage. Your issue is similar to the one faced in alpakka during upload to amazon S3. ( google alpakka s3 connector.. they wont let me post more than 2 links)

For some reason the s3 connector DiskBuffer however writes the entire incoming source of bytestrings to a file, before emitting out the chunk to do further stream processing..

What we want is something similar to limit a source of byte strings to specific length. In the example, they have limited the incoming Source[ByteString, _] to a source of fixed sized byteStrings by maintaining a memory buffer. I adopted it to work with Files. The advantage of this is that you can use a dedicated thread pool for this stage to do blocking IO. For good reactive stream you want to keep blocking IO in separate thread pool in actor system. PS: this does not try to make files of exact size.. so if we read 2KB extra in a 100MB file.. we write those extra bytes to the current file rather than trying to achieve exact size.

import java.io.{FileOutputStream, RandomAccessFile}
import java.nio.channels.FileChannel
import java.nio.file.Path

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream._
import akka.util.ByteString

case class MultipartUploadChunk(path: Path, size: Int, partNumber: Int)
//Starts writing the byteStrings received from upstream to a file. Emits a path after writing a partSize number of bytes. Does not attemtp to write exact number of bytes.
class FileChunker(maxSize: Int, tempDir: Path, partSize: Int)
    extends GraphStage[FlowShape[ByteString, MultipartUploadChunk]] {

  assert(maxSize > partSize, "Max size should be larger than part size. ")

  val in: Inlet[ByteString] = Inlet[ByteString]("PartsMaker.in")
  val out: Outlet[MultipartUploadChunk] = Outlet[MultipartUploadChunk]("PartsMaker.out")

  override val shape: FlowShape[ByteString, MultipartUploadChunk] = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with OutHandler with InHandler {

      var partNumber: Int = 0
      var length: Int = 0
      var currentBuffer: Option[PartBuffer] = None

      override def onPull(): Unit =
        if (isClosed(in)) {
          emitPart(currentBuffer, length)
        } else {
          pull(in)
        }

      override def onPush(): Unit = {
        val elem = grab(in)
        length += elem.size
        val currentPart: PartBuffer = currentBuffer match {
          case Some(part) => part
          case None =>
            val newPart = createPart(partNumber)
            currentBuffer = Some(newPart)
            newPart
        }
        currentPart.fileChannel.write(elem.asByteBuffer)
        if (length > partSize) {
          emitPart(currentBuffer, length)
          //3. .increment part number, reset length.
          partNumber += 1
          length = 0
        } else {
          pull(in)
        }
      }

      override def onUpstreamFinish(): Unit =
        if (length > 0) emitPart(currentBuffer, length) // emit part only if something is still left in current buffer.

      private def emitPart(maybePart: Option[PartBuffer], size: Int): Unit = maybePart match {
        case Some(part) =>
          //1. flush the part buffer and truncate the file.
          part.fileChannel.force(false)
          //          not sure why we do this truncate.. but was being done in alpakka. also maybe safe to do.
//                    val ch = new FileOutputStream(part.path.toFile).getChannel
//          try {
//            println(s"truncating to size $size")
//            ch.truncate(size)
//          } finally {
//            ch.close()
//          }
          //2emit the part
          val chunk = MultipartUploadChunk(path = part.path, size = length, partNumber = partNumber)
          push(out, chunk)
          part.fileChannel.close() // TODO: probably close elsewhere.
          currentBuffer = None
          //complete stage if in is closed.
          if (isClosed(in)) completeStage()
        case None => if (isClosed(in)) completeStage()
      }

      private def createPart(partNum: Int): PartBuffer = {
        val path: Path = partFile(partNum)
        //currentPart.deleteOnExit() //TODO: Enable in prod. requests that the file be deleted when VM dies.
        PartBuffer(path, new RandomAccessFile(path.toFile, "rw").getChannel)
      }

      /**
       * Creates a file in the temp directory with name bmcs-buffer-part-$partNumber
       * @param partNumber the part number in multipart upload.
       * @return
       * TODO:add unique id to the file name. for multiple
       */
      private def partFile(partNumber: Int): Path =
        tempDir.resolve(s"bmcs-buffer-part-$partNumber.bin")
      setHandlers(in, out, this)
    }

  case class PartBuffer(path: Path, fileChannel: FileChannel) //TODO:  see if you need mapped byte buffer. might be ok with just output stream / channel.

}

Upvotes: 1

I often revert to purely functional, non-akka, techniques for problems such as this and then "lift" those functions into akka constructs. By this I mean I try to use only scala "stuff" and then try to wrap that stuff inside of akka later on...

File Creation

Starting with the FileOutputStream creation based on "randomly generated names":

def randomFileNameGenerator : String = ??? //not specified in question

import java.io.FileOutputStream

val randomFileOutGenerator : () => FileOutputStream = 
  () => new FileOutputStream(randomFileNameGenerator)

State

There needs to be some way of storing the "state" of the current file, e.g. the number of bytes already written:

case class FileState(byteCount : Int = 0, 
                     fileOut : FileOutputStream = randomFileOutGenerator())

File Writing

First we determine if we'd breach the maximum file size threshold with the given ByteString:

import akka.util.ByteString

val isEndOfChunk : (FileState, ByteString, Int) => Boolean =
  (state, byteString, maxBytes) =>
    state.byteCount + byteString.length > maxBytes

We then have to write the function that creates a new FileState if we've maxed out the capacity of the current one or returns the current state if it is still below capacity:

val closeFileInState : FileState => Unit = 
  (_ : FileState).fileOut.close()

val getCurrentFileState(FileState, ByteString, Int) => FileState = 
  (state, byteString, maxBytes) =>
    if(isEndOfChunk(maxBytes, state, byteString)) {
      closeFileInState(state)
      FileState()
    }
    else
      state

The only thing left is to write to the FileOutputStream:

val writeToFileAndReturn(FileState, ByteString) => FileState = 
  (fileState, byteString) => {
    fileState.fileOut write byteString.toArray
    fileState copy (byteCount = fileState.byteCount + byteString.size)
  }

//the signature ordering will become useful
def writeToChunkedFile(maxBytes : Int)(fileState : FileState, byteString : ByteString) : FileState =    
  writeToFileAndReturn(getCurrentFileState(maxBytes, fileState, byteString), byteString)    

Fold On Any GenTraversableOnce

In scala a GenTraversableOnce is any collection, parallel or not, that has the fold operator. These include Iterator, Vector, Array, Seq, scala stream, ... Th final writeToChunkedFile function perfectly matches the signature of GenTraversableOnce#fold:

val anyIterable : Iterable = ???

val finalFileState = anyIterable.fold(FileState())(writetochunkedFile(maxBytes))

One final loose end; the last FileOutputStream needs to be closed as well. Since the fold will only emit that last FileState we can close that one:

closeFileInState(finalFileState)

Akka Streams

Akka Flow gets its fold from FlowOps#fold which happens to match the GenTraversableOnce signature. Therefore we can "lift" our regular functions into stream values similar to the way we used Iterable fold:

import akka.stream.scaladsl.Flow

def chunkerFlow(maxBytes : Int) : Flow[ByteString, FileState, _] = 
  Flow[ByteString].fold(FileState())(writeToChunkedFile(maxBytes))

The nice part about handling the problem with regular functions is that they can be used within other asynchronous frameworks beyond streams, e.g. Futures or Actors. You also don't need an akka ActorSystem in unit testing, just regular language data structures.

import akka.stream.scaladsl.Sink
import scala.concurrent.Future

def byteStringSink(maxBytes : Int) : Sink[ByteString, _] = 
  chunkerFlow(maxBytes) to (Sink foreach closeFileInState)

You can then use this Sink to drain HttpEntity coming from HttpRequest.

Upvotes: 7

Related Questions