Reputation: 1290
My Scala application kicks off an external process that writes a file to disk. In a separate thread, I want to read that file and copy its contents to an OutputStream
until the process is done and the file is no longer growing.
There are a couple of edge cases to consider:
BTW I can pass the thread a processCompletionFuture
variable which indicates when the file is done growing.
Is there an elegant and efficient way to do this? Perhaps using Akka Streams or actors? (I've tried using an Akka Stream off of the FileInputStream
, but the stream seems to terminate as soon as there are no more bytes in the input stream, which happens in case #2).
Upvotes: 0
Views: 584
Reputation: 19527
Alpakka, a library that is built on Akka Streams, has a FileTailSource
utility that mimics the tail -f
Unix command. For example:
import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.alpakka.file.scaladsl._
import akka.util.{ ByteString, Timeout }
import java.io.OutputStream
import java.nio.file.Path
import scala.concurrent._
import scala.concurrent.duration._
val path: Path = ???
val maxLineSize = 10000
val tailSource: Source[ByteString, NotUsed] = FileTailSource(
path = path,
maxChunkSize = maxLineSize,
startingPosition = 0,
pollingInterval = 500.millis
).via(Framing.delimiter(ByteString(System.lineSeparator), maxLineSize, true))
The above tailSource
reads an entire file line-by-line and continually reads freshly appended data every 500 milliseconds. To copy the stream contents to an OutputStream
, connect the source to a StreamConverters.fromOutputStream
sink:
val stream: Future[IOResult] =
tailSource
.runWith(StreamConverters.fromOutputStream(() => new OutputStream {
override def write(i: Int): Unit = ???
override def write(bytes: Array[Byte]): Unit = ???
}))
(Note that there is a FileTailSource.lines
method that produces a Source[String, NotUsed]
, but in this scenario it's more felicitous to work with ByteString
instead of String
. This is why the example uses FileTailSource.apply()
, which produces a Source[ByteString, NotUsed]
.)
The stream will fail if the file doesn't exist at the time of materialization. Therefore, you'll need to confirm the existence of the file before running the stream. This might be overkill, but one idea is to use Alpakka's DirectoryChangesSource
for that.
Upvotes: 2