Reputation: 4160
I'm creating a Flux using Flux.generate(). The generator (Consumer) is actually reading from a message queue. The problem is that this call takes quite a long time (occasionally 1-2 seconds even). This will make the flux to stop processing.
package com.github.loa.vault.service.listener;
import com.github.loa.document.service.domain.DocumentType;
import com.github.loa.queue.service.QueueManipulator;
import com.github.loa.queue.service.domain.Queue;
import com.github.loa.queue.service.domain.message.DocumentArchivingMessage;
import com.github.loa.vault.service.domain.DocumentArchivingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import reactor.core.publisher.SynchronousSink;
import java.util.function.Consumer;
@Slf4j
@Service
@RequiredArgsConstructor
public class VaultQueueConsumer implements Consumer<SynchronousSink<DocumentArchivingContext>> {
private final QueueManipulator queueManipulator;
@Override
public void accept(final SynchronousSink<DocumentArchivingContext> documentSourceItemSynchronousSink) {
final DocumentArchivingMessage documentArchivingMessage = (DocumentArchivingMessage)
queueManipulator.readMessage(Queue.DOCUMENT_ARCHIVING_QUEUE);
documentSourceItemSynchronousSink.next(
DocumentArchivingContext.builder()
.type(DocumentType.valueOf(documentArchivingMessage.getType()))
.source(documentArchivingMessage.getSource())
.content(documentArchivingMessage.getContent())
.build()
);
}
}
Obviously adding parallel
doesn't help because the generator is still being called one at a time.
Flux.generate(vaultQueueConsumer)
.parallel()
.runOn(Schedulers.parallel())
.flatMap(vaultDocumentManager::archiveDocument)
.subscribe();
Does anybody know how to make the generator parallel? I don't want to use Flux.create()
because then I would lose backpressure.
Upvotes: 3
Views: 1306
Reputation: 9997
Mono.just(1).repeat() // create infinite flux, maybe there is a nicer way for that?
.flatMap(this::readFromQueue, 100) // define queue polling concurrency
.flatMap(this::archiveDocument)
.subscribe();
private Mono<String> readFromQueue(Integer ignore)
{
return Mono.fromCallable(() -> {
Thread.sleep(1500); // your actual blocking queue polling here
return "queue_element";
}).subscribeOn(Schedulers.elastic()); // dedicate blocking call to threadpool
}
Upvotes: 1
Reputation: 10315
The problem is that the vaultQueueConsumer
includes slow operation.
So, the solution is to extract this slow operation from generate
to map
that can be parallelised.
As an idea you can generate a queue name where the messages have to be consumed from and do the actual message consumption in a map
method after making flux parallel:
String queue = "test";
Flux.<String>generate(synchronousSink -> synchronousSink.next(queue))
.parallel()
.runOn(Schedulers.parallel())
.map(queueManipulator::readMessage)
.doOnNext(log::info)
.subscribe();
The fake QueueManipulator
sleeps 1-2 seconds before returning a message:
public class QueueManipulator {
private final AtomicLong counter = new AtomicLong();
public String readMessage(String queue) {
sleep(); //sleep 1-2 seconds
return queue + " " + counter.incrementAndGet();
}
//...
}
This way message consumption is done in parallel:
12:49:22.362 [parallel-4] - test 2
12:49:22.362 [parallel-3] - test 4
12:49:22.362 [parallel-2] - test 1
12:49:22.362 [parallel-1] - test 3
12:49:23.369 [parallel-3] - test 6
12:49:23.369 [parallel-1] - test 5
12:49:23.369 [parallel-2] - test 7
12:49:23.369 [parallel-4] - test 8
This solution above is straightforward buy may look like a "hack".
Another idea is to call Flux.generate
inside flatMap
:
String queue = "test";
int parallelism = 5;
Flux.range(0, parallelism)
.flatMap(i ->
Flux.<String>generate(synchronousSink -> {
synchronousSink.next(queueManipulator.readMessage(queue));
}).subscribeOn(Schedulers.parallel()))
.doOnNext(log::info)
.subscribe();
Upvotes: 1
Reputation: 1722
Have you tried:
Flux.generate(vaultQueueConsumer)
.parallel()
.runOn(Schedulers.parallel())
.flatMap(vaultDocumentManager::archiveDocument)
.subscribe();
Upvotes: 0