bhh1988
bhh1988

Reputation: 1290

Scala streaming a live/growing file

My Scala application kicks off an external process that writes a file to disk. In a separate thread, I want to read that file and copy its contents to an OutputStream until the process is done and the file is no longer growing.

There are a couple of edge cases to consider:

  1. The file may not exist yet when the thread is ready to start.
  2. The thread may copy faster than the process is writing. In other words, it may reach the end of the file while the file is still growing.

BTW I can pass the thread a processCompletionFuture variable which indicates when the file is done growing.

Is there an elegant and efficient way to do this? Perhaps using Akka Streams or actors? (I've tried using an Akka Stream off of the FileInputStream, but the stream seems to terminate as soon as there are no more bytes in the input stream, which happens in case #2).

Upvotes: 0

Views: 584

Answers (1)

Jeffrey Chung
Jeffrey Chung

Reputation: 19527

Alpakka, a library that is built on Akka Streams, has a FileTailSource utility that mimics the tail -f Unix command. For example:

import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.alpakka.file.scaladsl._
import akka.util.{ ByteString, Timeout }
import java.io.OutputStream
import java.nio.file.Path
import scala.concurrent._
import scala.concurrent.duration._

val path: Path = ???

val maxLineSize = 10000

val tailSource: Source[ByteString, NotUsed] = FileTailSource(
  path = path,
  maxChunkSize = maxLineSize,
  startingPosition = 0,
  pollingInterval = 500.millis
).via(Framing.delimiter(ByteString(System.lineSeparator), maxLineSize, true))

The above tailSource reads an entire file line-by-line and continually reads freshly appended data every 500 milliseconds. To copy the stream contents to an OutputStream, connect the source to a StreamConverters.fromOutputStream sink:

val stream: Future[IOResult] =
  tailSource
    .runWith(StreamConverters.fromOutputStream(() => new OutputStream {
      override def write(i: Int): Unit = ???
      override def write(bytes: Array[Byte]): Unit = ???
    }))

(Note that there is a FileTailSource.lines method that produces a Source[String, NotUsed], but in this scenario it's more felicitous to work with ByteString instead of String. This is why the example uses FileTailSource.apply(), which produces a Source[ByteString, NotUsed].)

The stream will fail if the file doesn't exist at the time of materialization. Therefore, you'll need to confirm the existence of the file before running the stream. This might be overkill, but one idea is to use Alpakka's DirectoryChangesSource for that.

Upvotes: 2

Related Questions