Reputation: 24518
Using project Reactor 3.0.4.RELEASE. Conceptually, should be same in RxJava as well.
public Mono<Map<String, Boolean>> refreshPods(List<String> apps) {
return pods(apps)
.filter(this::isRunningAndNotThisApp)
.groupBy(Item::getName)
.flatMap(g -> g
.distinct(Item::getIp)
.collectList()
// TODO: This doesn't seem to be working as expected
.subscribeOn(Schedulers.newParallel("par-grp"))
.flatMap(client::refreshPods))
.flatMap(m -> Flux.fromIterable(m.entrySet()))
.collectMap(Map.Entry::getKey, Map.Entry::getValue);
}
The idea is to run client.refreshPods
in a separate thread for each group.
Edit: I'd tried publishOn
before posting this question and after the answers given here, but the output doesn't change.
Client:
public class MyServiceClientImpl implements MyServiceClient {
private final RestOperations restOperations;
private final ConfigRefreshProperties configRefreshProperties;
public Mono<Map<String, Boolean>> refreshPods(List<Item> pods) {
return Flux.fromIterable(pods)
.zipWith(Flux.interval(Duration.ofSeconds(configRefreshProperties.getRefreshDelaySeconds())),
(x, delay) -> x)
.flatMap(this::refreshWithRetry)
.collectMap(Tuple2::getT1, Tuple2::getT2);
}
private Mono<Tuple2<String, Boolean>> refreshWithRetry(Item pod) {
return Mono.<Boolean>create(emitter -> {
try {
log.info("Attempting to refresh pod: {}.", pod);
ResponseEntity<String> tryRefresh = refresh(pod);
if (!tryRefresh.getStatusCode().is2xxSuccessful()) {
log.error("Failed to refresh pod: {}.", pod);
emitter.success();
} else {
log.info("Successfully refreshed pod: {}.", pod);
emitter.success(true);
}
} catch (Exception e) {
emitter.error(e);
}
})
.map(b -> Tuples.of(pod.getIp(), b))
.log(getClass().getName(), Level.FINE)
.retryWhen(errors -> {
int maxRetries = configRefreshProperties.getMaxRetries();
return errors.zipWith(Flux.range(1, maxRetries + 1), (ex, i) -> Tuples.of(ex, i))
.flatMap(t -> {
Integer retryCount = t.getT2();
if (retryCount <= maxRetries && shouldRetry(t.getT1())) {
int retryDelaySeconds = configRefreshProperties.getRetryDelaySeconds();
long delay = (long) Math.pow(retryDelaySeconds, retryCount);
return Mono.delay(Duration.ofSeconds(delay));
}
log.error("Done retrying to refresh pod: {}.", pod);
return Mono.<Long>empty();
});
});
}
private ResponseEntity<String> refresh(Item pod) {
return restOperations.postForEntity(buildRefreshEndpoint(pod), null, String.class);
}
private String buildRefreshEndpoint(Item pod) {
return UriComponentsBuilder.fromUriString("http://{podIp}:{containerPort}/refresh")
.buildAndExpand(pod.getIp(), pod.getPort())
.toUriString();
}
private boolean shouldRetry(Throwable t) {
boolean clientError = ThrowableAnalyzer.getFirstOfType(t, HttpClientErrorException.class)
.map(HttpClientErrorException::getStatusCode)
.filter(s -> s.is4xxClientError())
.isPresent();
boolean timeoutError = ThrowableAnalyzer.getFirstOfType(t, TimeoutException.class)
.isPresent();
return timeoutError || !clientError;
}
}
The problem is that the log statement Attempting to refresh pod
is printed on the same thread for every group. What am I missing here?
Logs from test run:
2017-02-07 10:g12:55.348 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=news, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.357 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=news, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.358 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=parking, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.363 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=parking, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.364 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=localsearch, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.368 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=localsearch, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.369 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=auth, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.372 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=auth, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.373 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=log, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.377 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=log, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.378 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=fuel, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.381 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=fuel, ip=127.0.0.1, port=8888, podPhase=Running).
Upvotes: 0
Views: 1339
Reputation: 24518
I'm posting an answer myself for completeness. With help from @simon-baslé and @akarnokd, I got it right. Both of the following work. See reactor-core#421 for details.
Solution 1:
zipWith(Flux.interval(Duration.ofSeconds(groupMemberDelaySeconds)),
(x, delay) -> x)
.publishOn(Schedulers.newParallel("par-grp"))
.flatMap(this:: refreshWithRetry)
Solution 2:
zipWith(Flux.intervalMillis(1000 * groupMemberDelaySeconds, Schedulers.newTimer("par-grp")),
(x, delay) -> x)
.flatMap(this:: refreshWithRetry)
No subscribeOn
or publishOn
is necessary in the refreshPods
method.
Upvotes: 0
Reputation: 28301
edit: As made more explicit thanks to your newly provided log, and as picked up by David in the issue you created, the root cause is that you use an interval
here. This will switch context to the default TimedScheduler
(which will be the same for all groups). That's why anything done before the call to refreshPods
seems to be ignored (work is done on the interval thread), but publishOn/subscribeOn after the interval operator should work. In a nutshell my advice of using subscribeOn
directly after the create
still stands.
You trigger a blocking behavior (refresh(pod)
) that you wrap as a Mono
in refreshWithRetry
.
Unless you have a strong need for being concurrency-agnostic at this level, I'd advise you to immediately chain your subscribeOn
next to the create
.
This way, no surprise when using that Mono
: it respects the contract and doesn't block. Like this:
return Mono.<Boolean>create(emitter -> {
//...
})
.subscribeOn(Schedulers.newParallel("par-grp"))
.map(b -> Tuples.of(pod.getIp(), b))
If you want the method to return a concurrency-agnostic publisher, then you'd need to put the subscribeOn
closer to your blocking publisher, so you'd need to expand the flatMap
lambda:
.flatMap(pods -> client.refreshPods(pods)
.subscribeOn(Schedulers.newParallel("par-grp"))
)
Upvotes: 1
Reputation: 21446
In your code you put publishOn
before flatMap
. Methods combining observables like flatMap
or zip
do their own re-scheduling when working with async sources. interval
is such an async source in your case. That is why you get all results on 'timer' thread.
1) Use publishOn
right before the operation you wish to go in parallel. The operation itself should not involve re-scheduling. Eg. map
is a good one, flatMap
is bad.
2) Use another publishOn
right after it to reschedule results. Otherwise subscriber's thread might interfere.
Flux.range(1, 100)
.groupBy(i -> i % 5)
.flatMap(group -> group
.publishOn(Schedulers.newParallel("grp", 8))
.map(v -> {
// processing here
String threadName = Thread.currentThread().getName();
logger.info("processing {} from {} on {}", v, group.key(), threadName);
return v;
})
.publishOn(Schedulers.single())
)
.subscribe(v -> logger.info("got {}", v));
In case you want to make sure all group's items run on the same thread see this answer: https://stackoverflow.com/a/41697348/697313
Upvotes: 0