Lakatos Gyula
Lakatos Gyula

Reputation: 4160

Preload elements for Flux.generate(...)

I'm creating a Flux using Flux.generate(). The generator (Consumer) is actually reading from a message queue. The problem is that this call takes quite a long time (occasionally 1-2 seconds even). This will make the flux to stop processing.

package com.github.loa.vault.service.listener;

import com.github.loa.document.service.domain.DocumentType;
import com.github.loa.queue.service.QueueManipulator;
import com.github.loa.queue.service.domain.Queue;
import com.github.loa.queue.service.domain.message.DocumentArchivingMessage;
import com.github.loa.vault.service.domain.DocumentArchivingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import reactor.core.publisher.SynchronousSink;

import java.util.function.Consumer;

@Slf4j
@Service
@RequiredArgsConstructor
public class VaultQueueConsumer implements Consumer<SynchronousSink<DocumentArchivingContext>> {

    private final QueueManipulator queueManipulator;

    @Override
    public void accept(final SynchronousSink<DocumentArchivingContext> documentSourceItemSynchronousSink) {
        final DocumentArchivingMessage documentArchivingMessage = (DocumentArchivingMessage)
                queueManipulator.readMessage(Queue.DOCUMENT_ARCHIVING_QUEUE);

        documentSourceItemSynchronousSink.next(
                DocumentArchivingContext.builder()
                        .type(DocumentType.valueOf(documentArchivingMessage.getType()))
                        .source(documentArchivingMessage.getSource())
                        .content(documentArchivingMessage.getContent())
                        .build()
        );
    }
}

Obviously adding parallel doesn't help because the generator is still being called one at a time.

Flux.generate(vaultQueueConsumer)
    .parallel()
    .runOn(Schedulers.parallel()) 
    .flatMap(vaultDocumentManager::archiveDocument)
    .subscribe();

Does anybody know how to make the generator parallel? I don't want to use Flux.create() because then I would lose backpressure.

Upvotes: 3

Views: 1306

Answers (3)

Martin Tarj&#225;nyi
Martin Tarj&#225;nyi

Reputation: 9997

Mono.just(1).repeat()  // create infinite flux, maybe there is a nicer way for that?
    .flatMap(this::readFromQueue, 100) // define queue polling concurrency
    .flatMap(this::archiveDocument)
    .subscribe();
private Mono<String> readFromQueue(Integer ignore)
{
    return Mono.fromCallable(() -> {
        Thread.sleep(1500); // your actual blocking queue polling here
        return "queue_element";
    }).subscribeOn(Schedulers.elastic()); // dedicate blocking call to threadpool
}

Upvotes: 1

Eugene Khyst
Eugene Khyst

Reputation: 10315

The problem is that the vaultQueueConsumer includes slow operation. So, the solution is to extract this slow operation from generate to map that can be parallelised.

As an idea you can generate a queue name where the messages have to be consumed from and do the actual message consumption in a map method after making flux parallel:

String queue = "test";
Flux.<String>generate(synchronousSink -> synchronousSink.next(queue))
    .parallel()
    .runOn(Schedulers.parallel())
    .map(queueManipulator::readMessage)
    .doOnNext(log::info)
    .subscribe();

The fake QueueManipulator sleeps 1-2 seconds before returning a message:

public class QueueManipulator {

  private final AtomicLong counter = new AtomicLong();

  public String readMessage(String queue) {
    sleep(); //sleep 1-2 seconds
    return queue + " " + counter.incrementAndGet();
  }
  //...
}

This way message consumption is done in parallel:

12:49:22.362 [parallel-4] - test 2
12:49:22.362 [parallel-3] - test 4
12:49:22.362 [parallel-2] - test 1
12:49:22.362 [parallel-1] - test 3
12:49:23.369 [parallel-3] - test 6
12:49:23.369 [parallel-1] - test 5
12:49:23.369 [parallel-2] - test 7
12:49:23.369 [parallel-4] - test 8

This solution above is straightforward buy may look like a "hack".

Another idea is to call Flux.generate inside flatMap:

String queue = "test";
int parallelism = 5;
Flux.range(0, parallelism)
    .flatMap(i ->
        Flux.<String>generate(synchronousSink -> {
          synchronousSink.next(queueManipulator.readMessage(queue));
        }).subscribeOn(Schedulers.parallel()))
    .doOnNext(log::info)
    .subscribe();

Upvotes: 1

Ricard Kollcaku
Ricard Kollcaku

Reputation: 1722

Have you tried:

Flux.generate(vaultQueueConsumer)
 .parallel()
 .runOn(Schedulers.parallel()) 
 .flatMap(vaultDocumentManager::archiveDocument)
 .subscribe();

Upvotes: 0

Related Questions