user1980833
user1980833

Reputation: 221

Java 8 stream and incoming data (Kafka)

I have a queue (it happens to be Kafka but I am not sure that matters) from which I am reading messages. I want to create a stream to represent this data.

My pseudocode to consume a (Kafka) queue looks like this:

List<Message> messages = new ArrayList<>();

while (true) {
    ConsumerRecords<String, Message> records = kafkaConsumer.poll(100);

    messages.add(recordsToMessages(records));

    if (x) {
        break;
    }
}

return messages.stream();

Using this pseudocode the stream is not returned until the while loop is broken, i.e. until all of the queue has been read.

I want to be able to return the stream straight away, i.e. new messages can be added to the stream after it has returned.

I feel like I need to use Stream.generate but I am not sure how, or maybe I need a spliterator?

I will also want to close the stream at a later point in the code.

Thanks!

Upvotes: 2

Views: 2182

Answers (1)

Olivier Croisier
Olivier Croisier

Reputation: 6149

Here is a commented example of how it could be done :

public static void main(String[] args) {

    LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

    // Data producer
    Runnable job = () -> {
        // Send data to the stream (could come from your Kafka queue
        ThreadLocalRandom random = ThreadLocalRandom.current();
        for (int i = 0; i < 10; i++) {
            queue.offer(random.nextInt(100));
            delay(random.nextInt(2) + 1);
        }
        // Send the magic signal to stop the stream
        queue.offer(-1);
    };
    Thread thread = new Thread(job);
    thread.start();

    // Define the condition by which the stream knows there is no data left to consume
    // The function returns the next element wrapped in an Optional, or an empty Optional to tell there is no more data to read
    // In this example, the number -1 is the magic signal
    Function<BlockingQueue<Integer>, Optional<Integer>> endingCondition = q -> {
        try {
            Integer element = q.take();
            return element == -1 ? Optional.empty() : Optional.of(element);
        } catch (InterruptedException e) {
            return Optional.empty();
        }
    };
    QueueConsumingIterator<Integer> iterator = new QueueConsumingIterator<>(queue, endingCondition);

    // Construct a Stream on top of our custom queue-consuming Iterator
    Spliterator<Object> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED);
    Stream<Object> stream = StreamSupport.stream(spliterator, false);

    // Use the Stream as usual :)
    stream.map(String::valueOf).forEach(System.out::println);

}

.

// This is a custom Iterator that takes data from a BlockingQueue.
// Detection of the end of the data stream is use-case-dependant, so it is extracted as a user-provided Function<Queue, Optional>
// For example you may want to wait for a particular item in the queue, or consider the queue "dead"" after a certain timeout...
public static class QueueConsumingIterator<E> implements Iterator<E> {

    private final BlockingQueue<E> queue;
    private final Function<BlockingQueue<E>, Optional<E>> queueReader;
    private Optional<E> element;
    private boolean elementRead = false;

    public QueueConsumingIterator(BlockingQueue<E> queue, Function<BlockingQueue<E>, Optional<E>> queueReader) {
        this.queue = queue;
        this.queueReader = queueReader;
    }

    @Override
    public boolean hasNext() {
        if (!this.elementRead) {
            this.element = this.queueReader.apply(this.queue);
            this.elementRead = true;
        }
        return this.element.isPresent();
    }

    @Override
    public E next() {
        if (hasNext()) {
            this.elementRead = false;
            return this.element.get();
        }
        throw new NoSuchElementException();
    }

}

private static void delay(int timeout) {
    try {
        TimeUnit.SECONDS.sleep(timeout);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

The idea behind this code is that you can feed a Stream through a custom Iterator, itself extracting data from an external source.

Data is transferred from the external source to the Iterator through a Queue. And because only you know what your data looks like and how to detect that there isn't any left to read anymore, the process that determines if the Stream should continue to be fed is extracted to a user-provided function.

Hope that helps ?

Upvotes: 3

Related Questions