Reputation: 9489
Do I/O in chunks. Start processing chunks as soon as one becomes available, while further chunks are being read in background (but not more than X chunks are read ahead). Process chunks in parallel as they are being received. Consume each processed chunk in-order-of-reading, i.e. in original order of the chunk being read.
I've set up an MWE class to imitate the situation and it works to an extent:
Flux.fromFile(path, some-function-to-define-chunk)
// done with Flux.generate in MWE below
.prefetchOnIoThread(x-count: int)
// at this point we try to maintain a buffer filled with x-count pre-read chunks
.parallelMapOrdered(n-threads: int, limit-process-ahead: int)
// n-threads: are constantly trying to drain the x-count buffer, doing some transformation
// limit-process-ahead: as the operation results are needed in order, if we encounter an
// input element that takes a while to process, we don't want the pipeline to run too far
// ahead of this problematic element (to not overflow the buffers and use too much memory)
.consume(TMapped v)
Dependency: implementation 'io.projectreactor:reactor-core:3.3.5.RELEASE'
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.ParallelFlux;
import reactor.core.scheduler.Schedulers;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;
public class Tmp {
static final SimpleDateFormat fmt = new SimpleDateFormat("HH:mm:ss.SSS");
static long millisRead = 1; // time taken to "read" a chunk
static long millisProcess = 100; // time take to "process" a chunk
public static void main(String[] args) {
log("Before flux construct");
// Step 1: Generate / IO
Flux<Integer> f = Flux.generate( // imitate IO
AtomicInteger::new,
(atomicInteger, synchronousSink) -> {
sleepQuietly(millisRead);
Integer next = atomicInteger.getAndIncrement();
if (next > 50) {
synchronousSink.complete();
log("Emitting complete");
} else {
log("Emitting next : %d", next);
synchronousSink.next(next);
}
return atomicInteger;
},
atomicInteger -> log("State consumer called: pos=%s", atomicInteger.get()));
f = f.publishOn(Schedulers.elastic());
f = f.subscribeOn(Schedulers.elastic());
ParallelFlux<Integer> pf = f.parallel(2, 2);
pf = pf.runOn(Schedulers.elastic(), 2);
// Step 2: transform in parallel
pf = pf.map(i -> { // imitate processing steps
log("Processing begin: %d", i);
sleepQuietly(millisProcess); // 10x the time it takes to create an input for this operation
log("Processing done : %d", i);
return 1000 + i;
});
// Step 3: use transformed data, preferably in order of generation
Disposable sub = pf.sequential(3).subscribe(
next -> log(String.format("Finally got: %d", next)),
err -> err.printStackTrace(),
() -> log("Complete!"));
while (!sub.isDisposed()) {
log("Waiting pipeline completion...");
sleepQuietly(500);
}
log("Main done");
}
public static void log(String message) {
Thread t = Thread.currentThread();
Date d = new Date();
System.out.printf("[%s] @ [%s]: %s\n", t.getName(), fmt.format(d), message);
}
public static void log(String format, Object... args) {
log(String.format(format, args));
}
public static void sleepQuietly(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
}
Upvotes: 1
Views: 541
Reputation: 9489
Considering lack of answers, I'll post what I came up with.
final int threads = 2;
final int prefetch = 3;
Flux<Integer> gen = Flux.generate(AtomicInteger::new, (ai, sink) -> {
int i = ai.incrementAndGet();
if (i > 10) {
sink.complete();
} else {
sink.next(i);
}
return ai;
});
gen.parallel(threads, prefetch) // switch to parallel processing after genrator
.runOn(Schedulers.parallel(), prefetch) // if you don't do this, it won't run in parallel
.map(i -> i + 1000) // this is done in parallel
.ordered(Integer::compareTo) // you can do just .sequential(), which is faster
.subscribeOn(Schedulers.elastic()) // generator will run on this scheduler (once subscribed)
.subscribe(i -> {
System.out.printf("Transformed integer: " + i); // do something with generated and processed item
});
Upvotes: 2