Abhijit Sarkar
Abhijit Sarkar

Reputation: 24538

Scala vs Java Streaming: Scala prints nothing, Java works

I'm doing a comparison between Scala vs Java Reactive Spec implementations using akka-stream and RxJava, respectively. My use case is a simplistic grep: Given a directory, a file filter and a search text, I look in that directory for all matching files that have the text. I then stream the (filename -> matching line) pair. This works fine for Java but for Scala, nothing is printed. There's no exception but no output either. The data for the test is downloaded from the internet but as you can see, the code can easily be tested with any local directory as well.

Scala:

object Transformer {
  implicit val system = ActorSystem("transformer")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext: ExecutionContext = {
    implicitly
  }

  import collection.JavaConverters._

  def run(path: String, text: String, fileFilter: String) = {
    Source.fromIterator { () =>
      Files.newDirectoryStream(Paths.get(path), fileFilter).iterator().asScala
    }.map(p => {
      val lines = io.Source.fromFile(p.toFile).getLines().filter(_.contains(text)).map(_.trim).to[ImmutableList]
      (p, lines)
    })
      .runWith(Sink.foreach(e => println(s"${e._1} -> ${e._2}")))
  }
}

Java:

public class Transformer {
    public static void run(String path, String text, String fileFilter) {
        Observable.from(files(path, fileFilter)).flatMap(p -> {
            try {
                return Observable.from((Iterable<Map.Entry<String, List<String>>>) Files.lines(p)
                        .filter(line -> line.contains(text))
                        .map(String::trim)
                        .collect(collectingAndThen(groupingBy(pp -> p.toAbsolutePath().toString()), Map::entrySet)));
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }).toBlocking().forEach(e -> System.out.printf("%s -> %s.%n", e.getKey(), e.getValue()));
    }

    private static Iterable<Path> files(String path, String fileFilter) {
        try {
            return Files.newDirectoryStream(Paths.get(path), fileFilter);
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
}

Unit test using Scala Test:

class TransformerSpec extends FlatSpec with Matchers {
  "Transformer" should "extract temperature" in {
    Transformer.run(NoaaClient.currentConditionsPath(), "temp_f", "*.xml")
  }

  "Java Transformer" should "extract temperature" in {
    JavaTransformer.run(JavaNoaaClient.currentConditionsPath(false), "temp_f", "*.xml")
  }
}

Upvotes: 4

Views: 242

Answers (1)

Abhijit Sarkar
Abhijit Sarkar

Reputation: 24538

Dang, I forgot that Source returns a Future, which means the flow never ran. @MrWiggles' comment gave me a hint. The following Scala code produces equivalent result as the Java version.

Note: The code in my question didn't close the DirectoryStreamwhich, for directories with a large number of files, caused a java.io.IOException: Too many open files in system. The code below closes the resources up properly.

def run(path: String, text: String, fileFilter: String) = {
  val files = Files.newDirectoryStream(Paths.get(path), fileFilter)

  val future = Source(files.asScala.toList).map(p => {
    val lines = io.Source.fromFile(p.toFile).getLines().filter(_.contains(text)).map(_.trim).to[ImmutableList]
    (p, lines)
    })
    .filter(!_._2.isEmpty)
    .runWith(Sink.foreach(e => println(s"${e._1} -> ${e._2}")))

  Await.result(future, 10.seconds)

  files.close

  true // for testing
}

Upvotes: 1

Related Questions