Reputation: 2211
I've got a data access object that passes each item in a data source to a consumer:
public interface Dao<T> {
void forEachItem(Consumer<T> item);
}
This always produces items in a single threaded way - I can't currently change this interface.
I wanted to create a Flowable
from this interface:
private static Flowable<String> flowable(final Dao dao) {
return Flowable.create(emitter -> {
dao.forEachItem(item ->
emitter.onNext(item));
emitter.onComplete();
}, ERROR);
}
If I use this Flowable
in a situation where the processing takes longer than the rate at which items are emitted then I understandably get a missing back pressure exception as I am using ERROR
mode:
Dao<String> exampleDao =
itemConsumer ->
IntStream.range(0, 1_000).forEach(i ->
itemConsumer.accept(String.valueOf(i)));
flowable(exampleDao)
.map(v -> {
Thread.sleep(100);
return "id:" + v;
})
.blockingSubscribe(System.out::println);
I don't wish to buffer items - seems like this could lead to exhausting memory on very large data sets - if the operation is significantly slower than the producer.
I was hoping there would be a backpressure mode that would allow the emitter to block when passed next/completion events when it detects back pressure but that does not seem to be the case?
In my case as I know that the dao produces items in a single threaded way I thought I would be able to do something like:
dao.forEachItem(item -> {
while (emitter.requested() == 0) {
waitABit();
}
emitter.onNext(item)
});
but this seems to hang forever.
How wrong is my approach? :-) Is there a way of producing items in a way that respects downstream back pressure given my (relatively restrictive) set of circumstances?
I know I could do this with a separate process writing to a queue and then write a Flowable based on consuming from that queue- would that be the preferred approach instead?
Upvotes: 1
Views: 1365
Reputation: 19431
Check the part of the Flowable, especially the part with Supscription.request(long)
. I hope that gets you on the right way.
The TestProducer
from this example produces Integer
objects in a given range and pushes them to its Subscriber
. It extends the Flowable<Integer>
class. For a new subscriber, it creates a Subscription
object whose request(long)
method is used to create and publish the Integer values.
It is important for the Subscription
that is passed to the subscriber
that the request()
method which calls onNext()
on the subscriber can be recursively called from within this onNext()
call. To prevent a stack overflow, the shown implementation uses the outStandingRequests
counter and the isProducing
flag.
class TestProducer extends Flowable<Integer> {
static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
final int from, to;
public TestProducer(int from, int to) {
this.from = from;
this.to = to;
}
@Override
protected void subscribeActual(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new Subscription() {
/** the next value. */
public int next = from;
/** cancellation flag. */
private volatile boolean cancelled = false;
private volatile boolean isProducing = false;
private AtomicLong outStandingRequests = new AtomicLong(0);
@Override
public void request(long n) {
if (!cancelled) {
outStandingRequests.addAndGet(n);
// check if already fulfilling request to prevent call between request() an subscriber .onNext()
if (isProducing) {
return;
}
// start producing
isProducing = true;
while (outStandingRequests.get() > 0) {
if (next > to) {
logger.info("producer finished");
subscriber.onComplete();
break;
}
subscriber.onNext(next++);
outStandingRequests.decrementAndGet();
}
isProducing = false;
}
}
@Override
public void cancel() {
cancelled = true;
}
});
}
}
The Consumer in this example extends DefaultSubscriber<Integer>
and on start and after consuming an Integer requests the next one. On consuming the Integer values, there is a little delay, so the backpressure will be built up for the producer.
class TestConsumer extends DefaultSubscriber<Integer> {
private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);
@Override
protected void onStart() {
request(1);
}
@Override
public void onNext(Integer i) {
logger.info("consuming {}", i);
if (0 == (i % 5)) {
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
// can be ignored, just used for pausing
}
}
request(1);
}
@Override
public void onError(Throwable throwable) {
logger.error("error received", throwable);
}
@Override
public void onComplete() {
logger.info("consumer finished");
}
}
in the following main method of a test class the producer and consumer are created and wired up:
public static void main(String[] args) {
try {
final TestProducer testProducer = new TestProducer(1, 1_000);
final TestConsumer testConsumer = new TestConsumer();
testProducer
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.single())
.blockingSubscribe(testConsumer);
} catch (Throwable t) {
t.printStackTrace();
}
}
When running the example, the logfile shows that the consumer runs continuously, while the producer only gets active when the internal Flowable buffer of rxjava2 needs to be refilled.
Upvotes: 2