ASD
ASD

Reputation: 69

Unable to merge multiple streams into one

I am trying to merge content of multiple files into single file and to test my code,

I have 3 files in my test dir - file1.txt : this file2.txt : is file3.txt : test

and here is the code to combine all 3 files as stream and write to single "out.txt" file. but this code only write content of one file to "out.txt", why ?

**

import java.io.{File, FileInputStream, InputStream, SequenceInputStream}
  import scala.collection.JavaConverters.asJavaEnumeration
  def mergeInputStreams(files: Iterator[File]): InputStream = new SequenceInputStream(asJavaEnumeration(files.map(new FileInputStream(_))))
  val d = "/Users/pink/temp"
  val file = new File(d);
  //file.listFiles.iterator.foreach(println)
  val fss = mergeInputStreams(file.listFiles.toIterator)
  val outfile = new File("/Users/pink/tmp/out.txt")
  val os = new FileOutputStream(outfile)
  try {
    while (fss.available()>0) {
      os.write(fss.read())
    }
  } finally {
    fss.close()
    os.close()
  }

**

I expect the above code should produce one file with following content - out.txt: this is test

Upvotes: 1

Views: 803

Answers (2)

codenoodle
codenoodle

Reputation: 994

mergeInputStreams is working correctly, but the while loop for writing the InputStream to the FileOutputStream isn't getting all of the content. @SergGr's answer very clearly states why this is happening. By replacing the read-write while loop with the IOUtils copy method everything gets written to the file, but in the wrong order. The order of the output is determined by this line of code:

file.listFiles.toIterator

once we sort the input files, we get this in out.txt:

this
is
test

Here's the code that does this:

import scala.collection.JavaConverters.asJavaEnumeration
import java.io.{File, FileInputStream, InputStream, FileOutputStream, SequenceInputStream, IOException}
import org.apache.commons.io.IOUtils

object Example1 {
  def main(args: Array[String]): Unit = {
    def mergeInputStreams(files: Iterator[File]): InputStream = 
        new SequenceInputStream(asJavaEnumeration(files.map(new FileInputStream(_))))
    val d = "/Users/pink/temp"
    val file = new File(d)
    val fss = mergeInputStreams(file.listFiles.toList.sorted.toIterator)
    val os = new FileOutputStream(new File("/Users/pink/tmp/out.txt"))
    try {
      IOUtils.copy(fss, os)
    }
    catch {
      case e: IOException => println(s"IO exception: $e.getMessage")
    }
  }
}

This solution definitely works, but we're using a lot of Java code where side-effects happen without our type system knowing. Because of that we have to be careful to catch exceptions and place IO operations exactly where we want them.

A more functional way to write this would be with fs2 Streams. Here's the example from their readme file modified to match what your code is doing:

import cats.effect.{ExitCode, IO, IOApp, Resource}
import cats.implicits._
import fs2.{io, text, Stream}
import java.nio.file.Paths
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext

object Example2 extends IOApp {
  private val blockingExecutionContext =
        Resource.make(IO(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))))(ec => IO(ec.shutdown()))

  val converter: Stream[IO, Unit] = Stream.resource(blockingExecutionContext).flatMap { blockingEC =>

    val f1 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file1.txt"), blockingEC, 4096)
    val f2 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file2.txt"), blockingEC, 4096)
    val f3 = io.file.readAll[IO](Paths.get("/Users/pink/temp/file3.txt"), blockingEC, 4096)

    (f2 merge f3 merge f1)
      .through(text.utf8Decode)
      .through(text.lines)
      .intersperse("\n")
      .through(text.utf8Encode)
      .through(io.file.writeAll(Paths.get("/Users/pink/tmp/out.txt"), blockingEC))
  }

  def run(args: List[String]): IO[ExitCode] =
    converter.compile.drain.as(ExitCode.Success)
}

No IO operations happen until run is called, and all exceptions are maintained inside the IO type so we don't have to worry about catching throughout our code.

Upvotes: 0

SergGr
SergGr

Reputation: 23788

This behavior happens because fss.available() > 0 is a wrong check for such a task. The JavaDoc for InputStream.available() says (emphasis is mine)

Returns an estimate of the number of bytes that can be read (or skipped over) from this input stream without blocking by the next invocation of a method for this input stream.

The fact that fss.available() > 0 guarantees that the stream is not finished but it is not true in the reverse direction: fss.available() might be 0 but you might still be able to read more data. This is potentially true even for file-based InputStreams. For example, assume that the file is actually located on a different server mounted using some network file system. In such case an implementation of available that returns the number of bytes cached on the client side is a reasonable implementation of the interface because getting more data does require a blocking request over the network.

If you read the JavaDoc for SequenceInputStream.available() you may see

...
This method simply calls available of the current underlying input stream and returns the result.

And this is probably the only sane implementation of the interface contract: generally you can't in a non-blocking way distinguish between cases when is.available() == 0 because the end was reached and when just more waiting or a blocking operation is needed to get more data.

The intent of the available method is to be used for various optimizations, not as a check for the end of the stream. The only proper check for whether InputStream has reached the end is potentially blocking read() == -1.

Upvotes: 1

Related Questions