Reputation: 115
I'm trying to split an incoming Akka stream of bytes (from the body of an http request, but it could also be from a file) into multiple files of a defined size.
For example, if I'm uploading a 10Gb file, it would create something like 10 files of 1Gb. The files would have randomly generated names. My issue is that I don't really know where to start, because all the responses and examples I've read are either storing the whole chunk into memory, or using a delimiter based on a string. Except I can't really have "chunks" of 1Gb, and then just write them to the disk..
Is there any easy way to perform that kind of operation ? My only idea would be to use something like this http://doc.akka.io/docs/akka/2.4/scala/stream/stream-cookbook.html#Chunking_up_a_stream_of_ByteStrings_into_limited_size_ByteStrings but transformed to something like FlowShape[ByteString, File]
, writting myself into a file the chunks until the max file size is reached, then creating a new file, etc.., and streaming back the created files. Which looks like an atrocious idea not using properly Akka..
Thanks in advance
Upvotes: 4
Views: 3152
Reputation: 19527
The idiomatic way to split a ByteString
stream to multiple files is to use Alpakka's LogRotatorSink
. From the documentation:
This sink will takes a function as parameter which returns a
Bytestring => Option[Path]
function. If the generated function returns a path the sink will rotate the file output to this new path and the actualByteString
will be written to this new file too. With this approach the user can define a custom stateful file generation implementation.
The following fileSizeRotationFunction
is also from the documentation:
val fileSizeRotationFunction = () => {
val max = 10 * 1024 * 1024
var size: Long = max
(element: ByteString) =>
{
if (size + element.size > max) {
val path = Files.createTempFile("out-", ".log")
size = element.size
Some(path)
} else {
size += element.size
None
}
}
}
An example of its use:
val source: Source[ByteString, _] = ???
source.runWith(LogRotatorSink(fileSizeRotationFunction))
Upvotes: 1
Reputation: 13
You could write a custom graph stage. Your issue is similar to the one faced in alpakka during upload to amazon S3. ( google alpakka s3 connector.. they wont let me post more than 2 links)
For some reason the s3 connector DiskBuffer however writes the entire incoming source of bytestrings to a file, before emitting out the chunk to do further stream processing..
What we want is something similar to limit a source of byte strings to specific length. In the example, they have limited the incoming Source[ByteString, _] to a source of fixed sized byteStrings by maintaining a memory buffer. I adopted it to work with Files. The advantage of this is that you can use a dedicated thread pool for this stage to do blocking IO. For good reactive stream you want to keep blocking IO in separate thread pool in actor system. PS: this does not try to make files of exact size.. so if we read 2KB extra in a 100MB file.. we write those extra bytes to the current file rather than trying to achieve exact size.
import java.io.{FileOutputStream, RandomAccessFile}
import java.nio.channels.FileChannel
import java.nio.file.Path
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream._
import akka.util.ByteString
case class MultipartUploadChunk(path: Path, size: Int, partNumber: Int)
//Starts writing the byteStrings received from upstream to a file. Emits a path after writing a partSize number of bytes. Does not attemtp to write exact number of bytes.
class FileChunker(maxSize: Int, tempDir: Path, partSize: Int)
extends GraphStage[FlowShape[ByteString, MultipartUploadChunk]] {
assert(maxSize > partSize, "Max size should be larger than part size. ")
val in: Inlet[ByteString] = Inlet[ByteString]("PartsMaker.in")
val out: Outlet[MultipartUploadChunk] = Outlet[MultipartUploadChunk]("PartsMaker.out")
override val shape: FlowShape[ByteString, MultipartUploadChunk] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler with InHandler {
var partNumber: Int = 0
var length: Int = 0
var currentBuffer: Option[PartBuffer] = None
override def onPull(): Unit =
if (isClosed(in)) {
emitPart(currentBuffer, length)
} else {
pull(in)
}
override def onPush(): Unit = {
val elem = grab(in)
length += elem.size
val currentPart: PartBuffer = currentBuffer match {
case Some(part) => part
case None =>
val newPart = createPart(partNumber)
currentBuffer = Some(newPart)
newPart
}
currentPart.fileChannel.write(elem.asByteBuffer)
if (length > partSize) {
emitPart(currentBuffer, length)
//3. .increment part number, reset length.
partNumber += 1
length = 0
} else {
pull(in)
}
}
override def onUpstreamFinish(): Unit =
if (length > 0) emitPart(currentBuffer, length) // emit part only if something is still left in current buffer.
private def emitPart(maybePart: Option[PartBuffer], size: Int): Unit = maybePart match {
case Some(part) =>
//1. flush the part buffer and truncate the file.
part.fileChannel.force(false)
// not sure why we do this truncate.. but was being done in alpakka. also maybe safe to do.
// val ch = new FileOutputStream(part.path.toFile).getChannel
// try {
// println(s"truncating to size $size")
// ch.truncate(size)
// } finally {
// ch.close()
// }
//2emit the part
val chunk = MultipartUploadChunk(path = part.path, size = length, partNumber = partNumber)
push(out, chunk)
part.fileChannel.close() // TODO: probably close elsewhere.
currentBuffer = None
//complete stage if in is closed.
if (isClosed(in)) completeStage()
case None => if (isClosed(in)) completeStage()
}
private def createPart(partNum: Int): PartBuffer = {
val path: Path = partFile(partNum)
//currentPart.deleteOnExit() //TODO: Enable in prod. requests that the file be deleted when VM dies.
PartBuffer(path, new RandomAccessFile(path.toFile, "rw").getChannel)
}
/**
* Creates a file in the temp directory with name bmcs-buffer-part-$partNumber
* @param partNumber the part number in multipart upload.
* @return
* TODO:add unique id to the file name. for multiple
*/
private def partFile(partNumber: Int): Path =
tempDir.resolve(s"bmcs-buffer-part-$partNumber.bin")
setHandlers(in, out, this)
}
case class PartBuffer(path: Path, fileChannel: FileChannel) //TODO: see if you need mapped byte buffer. might be ok with just output stream / channel.
}
Upvotes: 1
Reputation: 17953
I often revert to purely functional, non-akka, techniques for problems such as this and then "lift" those functions into akka constructs. By this I mean I try to use only scala "stuff" and then try to wrap that stuff inside of akka later on...
File Creation
Starting with the FileOutputStream
creation based on "randomly generated names":
def randomFileNameGenerator : String = ??? //not specified in question
import java.io.FileOutputStream
val randomFileOutGenerator : () => FileOutputStream =
() => new FileOutputStream(randomFileNameGenerator)
State
There needs to be some way of storing the "state" of the current file, e.g. the number of bytes already written:
case class FileState(byteCount : Int = 0,
fileOut : FileOutputStream = randomFileOutGenerator())
File Writing
First we determine if we'd breach the maximum file size threshold with the given ByteString
:
import akka.util.ByteString
val isEndOfChunk : (FileState, ByteString, Int) => Boolean =
(state, byteString, maxBytes) =>
state.byteCount + byteString.length > maxBytes
We then have to write the function that creates a new FileState
if we've maxed out the capacity of the current one or returns the current state if it is still below capacity:
val closeFileInState : FileState => Unit =
(_ : FileState).fileOut.close()
val getCurrentFileState(FileState, ByteString, Int) => FileState =
(state, byteString, maxBytes) =>
if(isEndOfChunk(maxBytes, state, byteString)) {
closeFileInState(state)
FileState()
}
else
state
The only thing left is to write to the FileOutputStream
:
val writeToFileAndReturn(FileState, ByteString) => FileState =
(fileState, byteString) => {
fileState.fileOut write byteString.toArray
fileState copy (byteCount = fileState.byteCount + byteString.size)
}
//the signature ordering will become useful
def writeToChunkedFile(maxBytes : Int)(fileState : FileState, byteString : ByteString) : FileState =
writeToFileAndReturn(getCurrentFileState(maxBytes, fileState, byteString), byteString)
Fold On Any GenTraversableOnce
In scala a GenTraversableOnce
is any collection, parallel or not, that has the fold operator. These include Iterator, Vector, Array, Seq, scala stream, ... Th final writeToChunkedFile
function perfectly matches the signature of GenTraversableOnce#fold:
val anyIterable : Iterable = ???
val finalFileState = anyIterable.fold(FileState())(writetochunkedFile(maxBytes))
One final loose end; the last FileOutputStream
needs to be closed as well. Since the fold will only emit that last FileState
we can close that one:
closeFileInState(finalFileState)
Akka Streams
Akka Flow gets its fold
from FlowOps#fold which happens to match the GenTraversableOnce
signature. Therefore we can "lift" our regular functions into stream values similar to the way we used Iterable
fold:
import akka.stream.scaladsl.Flow
def chunkerFlow(maxBytes : Int) : Flow[ByteString, FileState, _] =
Flow[ByteString].fold(FileState())(writeToChunkedFile(maxBytes))
The nice part about handling the problem with regular functions is that they can be used within other asynchronous frameworks beyond streams, e.g. Futures or Actors. You also don't need an akka ActorSystem
in unit testing, just regular language data structures.
import akka.stream.scaladsl.Sink
import scala.concurrent.Future
def byteStringSink(maxBytes : Int) : Sink[ByteString, _] =
chunkerFlow(maxBytes) to (Sink foreach closeFileInState)
You can then use this Sink
to drain HttpEntity
coming from HttpRequest
.
Upvotes: 7