Rita
Rita

Reputation: 1273

Parallel Stream repeating items

I am retrieving big chunks of data from DB and using this data to write it somewhere else. In order to avoid a long processing time, I'm trying to use parallel streams to write it.
When I run this as sequential streams, it works perfectly. However, if I change it to parallel, the behavior is odd: it prints the same object multiple times (more than 10).

@PostConstruct
public void retrieveAllTypeRecords() throws SQLException {
    logger.info("Retrieve batch of Type records.");
    try {
        Stream<TypeRecord> typeQueryAsStream = jdbcStream.getTypeQueryAsStream();
        typeQueryAsStream.forEach((type) -> {
            logger.info("Printing Type with field1: {} and field2: {}.", type.getField1(), type.getField2()); //the same object gets printed here multiple times
            //write this object somewhere else
        });
        logger.info("Completed full retrieval of Type data.");
    } catch (Exception e) {
        logger.error("error: " + e);
    }
}

public Stream<TypeRecord> getTypeQueryAsStream() throws SQLException {
    String sql = typeRepository.getQueryAllTypesRecords(); //retrieves SQL query in String format

    TypeMapper typeMapper = new TypeMapper();

    JdbcStream.StreamableQuery query = jdbcStream.streamableQuery(sql);
    Stream<TypeRecord> stream = query.stream()
        .map(row -> {
            return typeMapper.mapRow(row); //maps columns values to object values
        });
    return stream;
    }


public class StreamableQuery implements Closeable {

    (...)

    public Stream<SqlRow> stream() throws SQLException {
        final SqlRowSet rowSet = new ResultSetWrappingSqlRowSet(preparedStatement.executeQuery());
        final SqlRow sqlRow = new SqlRowAdapter(rowSet);

        Supplier<Spliterator<SqlRow>> supplier = () -> Spliterators.spliteratorUnknownSize(new Iterator<SqlRow>() {
            @Override
            public boolean hasNext() {
                return !rowSet.isLast();
            }

            @Override
            public SqlRow next() {
                if (!rowSet.next()) {
                    throw new NoSuchElementException();
                }
                return sqlRow;
            }
        }, Spliterator.CONCURRENT);
        return StreamSupport.stream(supplier, Spliterator.CONCURRENT, true); //this boolean sets the stream as parallel
    }
}

I've also tried using typeQueryAsStream.parallel().forEach((type) but the result is the same.

Example of output:
[ForkJoinPool.commonPool-worker-1] INFO TypeService - Saving Type with field1: L6797 and field2: P1433.
[ForkJoinPool.commonPool-worker-1] INFO TypeService - Saving Type with field1: L6797 and field2: P1433.
[main] INFO TypeService - Saving Type with field1: L6797 and field2: P1433.
[ForkJoinPool.commonPool-worker-1] INFO TypeService - Saving Type with field1: L6797 and field2: P1433.

Upvotes: 2

Views: 1145

Answers (1)

Holger
Holger

Reputation: 298123

Well, look at you code,

    final SqlRow sqlRow = new SqlRowAdapter(rowSet);

    Supplier<Spliterator<SqlRow>> supplier = () -> Spliterators.spliteratorUnknownSize(new Iterator<SqlRow>() {
…
        @Override
        public SqlRow next() {
            if (!rowSet.next()) {
                throw new NoSuchElementException();
            }
            return sqlRow;
        }
    }, Spliterator.CONCURRENT);

You are returning the same object every time. You achieve your desired effects by implicitly modifying the state of this object when calling rowSet.next().

This obviously can’t work when multiple threads try to access that single object concurrently. Even buffering some items, to hand them over to another thread will cause trouble. Therefore, such interference can cause problems with sequential streams as well, as soon as stateful intermediate operations are involved, like sorted or distinct.

Assuming that typeMapper.mapRow(row) will produce an actual data item which has no interference to other data items, you should integrate this step into the stream source, to create a valid stream.

public Stream<TypeRecord> stream(TypeMapper typeMapper) throws SQLException {
    SqlRowSet rowSet = new ResultSetWrappingSqlRowSet(preparedStatement.executeQuery());
    SqlRow sqlRow = new SqlRowAdapter(rowSet);

    Spliterator<TypeRecord> sp = new Spliterators.AbstractSpliterator<TypeRecord>(
            Long.MAX_VALUE, Spliterator.CONCURRENT|Spliterator.ORDERED) {
        @Override
        public boolean tryAdvance(Consumer<? super TypeRecord> action) {
            if(!rowSet.next()) return false;
            action.accept(typeMapper.mapRow(sqlRow));
            return true;
        }
    };
    return StreamSupport.stream(sp, true); //this boolean sets the stream as parallel
}

Note that for a lot of use cases, like this one, implementing a Spliterator is simpler than implementing an Iterator (which needs to be wrapped via spliteratorUnknownSize anyway). Also, there is no need to encapsulate this instantiation into a Supplier.

As a final note, the current implementation does not perform well for streams with an unknown size, as it treats Long.MAX_VALUE like a very large number, ignoring the “unknown” semantic assigned to it by the specification. It will be very beneficial to the parallel performance to provide an estimate size, it doesn’t need to be precise, in fact, with the current implementation, even a completely made up number, say 1000 may perform better than correctly using Long.MAX_VALUE to denote an entirely unknown size.

Upvotes: 5

Related Questions