Devu
Devu

Reputation: 451

Java 8 streams and concurrent writes

I have code like so

    public static void main(String[] args) throws Exception {
      long start = System.currentTimeMillis();
      List<String> matches = new Vector<>(); // Race condition for ArrayList??
      BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("AHugeFile.txt")));
      BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("output.txt")));
      reader.lines().parallel()
         .filter(s -> s.matches("someFancyRegEx"))
         .forEach(s -> {
               matches.add(s);
               try {
                  writer.write(s);
                  writer.newLine();
               } catch (Exception e) {
                  System.out.println("error");
               }
            }
         );
      out.println("Processing took " + (System.currentTimeMillis() - start) / 1000 + " seconds and matches " + matches.size());
      reader.close();
      writer.flush();
      writer.close();
   }

I noticed that if I replace the Vector with an ArrayList on Line 3, I get different results in the matches each time. I'm just about getting my hands dirty on Streams but assume that the forEach executes concurrently trying to write to the ArrayList which misses some writes! With a Vector, the results are consistent.

I have two Questions:

  1. Is my reasoning about the ArrayList causing a RACE correct?
  2. Given that the 'write' is also writing to a file in the same terminal operation, could the 'write' potentially miss some lines? In my tests, running the program a few times, the results seem to be consistent with the correct number of lines being written out.

Upvotes: 0

Views: 1993

Answers (2)

fge
fge

Reputation: 121840

First things first: define if you care about the order in which lines are written or not; .forEach() strips the ORDERED characteristic of a Spliterator (been there, done that).

Second: use what tools Java 8 provides; it has two very convenient methods which are Files.lines() and Files.write().

Third: handle your resources correctly! There is no guarantee in your code that the file descriptors will be closed correctly.

Fourth: .matches() will recreate a Pattern anew each time and you always filter using the same regex... You are wasting resources.

Fifth: given that the write method of a BufferedWriter is synchronized, you don't gain much by parallelism.

Here is how I would do it:

public static void writeFiltered(final Path srcFile, final Path dstFile,
    final String regex)
    throws IOException
{
    final Pattern pattern = Pattern.compile(regex);

    final List<String> filteredLines;

    try (
        // UTF-8 by default
        final Stream<String> srcLines = Files.lines(srcFile);
    ) {
        filteredLines = srcLines.map(pattern::matcher)
            .filter(Matcher::matches)
            .collect(Collectors.toList());
    }

    // UTF-8 by default
    Files.write(dstFile, filteredLines);
}

Upvotes: 2

Abhijeet Dhumal
Abhijeet Dhumal

Reputation: 1809

  1. ArrayList is not a synchronized collection, so yes it will cause a RACE condition. All the methods that change the state of the vector are synchronized so you did not find any problem over there.

  2. The write method of BufferedWriter is synchronized, so all the writes will be consistent across the threads. So individual write operation in the file will be thread safe. But you will need to explicitly handle synchronization to make it consistent across the threads.

Here is the code snippet of write method in Java 6.

public void write(String s, int off, int len) throws IOException {

    synchronized (lock) {

        ensureOpen();

        int b = off, t = off + len;

        while (b < t) {

            int d = min(nChars - nextChar, t - b);
            s.getChars(b, b + d, cb, nextChar);
            b += d;
            nextChar += d;

            if (nextChar >= nChars)
                flushBuffer();
            }
        }
    }
}

Upvotes: 0

Related Questions