Abhijit Sarkar
Abhijit Sarkar

Reputation: 24518

Reactive parallelization doesn't work

Using project Reactor 3.0.4.RELEASE. Conceptually, should be same in RxJava as well.

public Mono<Map<String, Boolean>> refreshPods(List<String> apps) {
    return pods(apps)
            .filter(this::isRunningAndNotThisApp)
            .groupBy(Item::getName)
            .flatMap(g -> g
                    .distinct(Item::getIp)
                    .collectList()
                    // TODO: This doesn't seem to be working as expected
                    .subscribeOn(Schedulers.newParallel("par-grp"))
                    .flatMap(client::refreshPods))
            .flatMap(m -> Flux.fromIterable(m.entrySet()))
            .collectMap(Map.Entry::getKey, Map.Entry::getValue);
}

The idea is to run client.refreshPods in a separate thread for each group.

Edit: I'd tried publishOn before posting this question and after the answers given here, but the output doesn't change.

Client:

public class MyServiceClientImpl implements MyServiceClient {
    private final RestOperations restOperations;
    private final ConfigRefreshProperties configRefreshProperties;

    public Mono<Map<String, Boolean>> refreshPods(List<Item> pods) {
        return Flux.fromIterable(pods)
                .zipWith(Flux.interval(Duration.ofSeconds(configRefreshProperties.getRefreshDelaySeconds())),
                        (x, delay) -> x)
                .flatMap(this::refreshWithRetry)
                .collectMap(Tuple2::getT1, Tuple2::getT2);
    }

    private Mono<Tuple2<String, Boolean>> refreshWithRetry(Item pod) {
        return Mono.<Boolean>create(emitter -> {
            try {
                log.info("Attempting to refresh pod: {}.", pod);
                ResponseEntity<String> tryRefresh = refresh(pod);

                if (!tryRefresh.getStatusCode().is2xxSuccessful()) {
                    log.error("Failed to refresh pod: {}.", pod);
                    emitter.success();
                } else {
                    log.info("Successfully refreshed pod: {}.", pod);
                    emitter.success(true);
                }
            } catch (Exception e) {
                emitter.error(e);
            }
        })
                .map(b -> Tuples.of(pod.getIp(), b))
                .log(getClass().getName(), Level.FINE)
                .retryWhen(errors -> {
                    int maxRetries = configRefreshProperties.getMaxRetries();
                    return errors.zipWith(Flux.range(1, maxRetries + 1), (ex, i) -> Tuples.of(ex, i))
                            .flatMap(t -> {
                                Integer retryCount = t.getT2();
                                if (retryCount <= maxRetries && shouldRetry(t.getT1())) {
                                    int retryDelaySeconds = configRefreshProperties.getRetryDelaySeconds();
                                    long delay = (long) Math.pow(retryDelaySeconds, retryCount);
                                    return Mono.delay(Duration.ofSeconds(delay));
                                }
                                log.error("Done retrying to refresh pod: {}.", pod);
                                return Mono.<Long>empty();
                            });
                });
    }

    private ResponseEntity<String> refresh(Item pod) {
        return restOperations.postForEntity(buildRefreshEndpoint(pod), null, String.class);
    }

    private String buildRefreshEndpoint(Item pod) {
        return UriComponentsBuilder.fromUriString("http://{podIp}:{containerPort}/refresh")
                .buildAndExpand(pod.getIp(), pod.getPort())
                .toUriString();
    }

    private boolean shouldRetry(Throwable t) {
        boolean clientError = ThrowableAnalyzer.getFirstOfType(t, HttpClientErrorException.class)
                .map(HttpClientErrorException::getStatusCode)
                .filter(s -> s.is4xxClientError())
                .isPresent();

        boolean timeoutError = ThrowableAnalyzer.getFirstOfType(t, TimeoutException.class)
                .isPresent();

        return timeoutError || !clientError;
    }
}

The problem is that the log statement Attempting to refresh pod is printed on the same thread for every group. What am I missing here?

Logs from test run:

2017-02-07 10:g12:55.348  INFO 33905 --- [        timer-1] c.n.d.cloud.config.MyServiceClientImpl  : Attempting to refresh pod: Item(name=news, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.357  INFO 33905 --- [        timer-1] c.n.d.cloud.config.MyServiceClientImpl  : Successfully refreshed pod: Item(name=news, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.358  INFO 33905 --- [        timer-1] c.n.d.cloud.config.MyServiceClientImpl  : Attempting to refresh pod: Item(name=parking, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.363  INFO 33905 --- [        timer-1] c.n.d.cloud.config.MyServiceClientImpl  : Successfully refreshed pod: Item(name=parking, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.364  INFO 33905 --- [        timer-1] c.n.d.cloud.config.MyServiceClientImpl  : Attempting to refresh pod: Item(name=localsearch, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.368  INFO 33905 --- [        timer-1] c.n.d.cloud.config.MyServiceClientImpl  : Successfully refreshed pod: Item(name=localsearch, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.369  INFO 33905 --- [        timer-1] c.n.d.cloud.config.MyServiceClientImpl  : Attempting to refresh pod: Item(name=auth, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.372  INFO 33905 --- [        timer-1] c.n.d.cloud.config.MyServiceClientImpl  : Successfully refreshed pod: Item(name=auth, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.373  INFO 33905 --- [        timer-1] c.n.d.cloud.config.MyServiceClientImpl  : Attempting to refresh pod: Item(name=log, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.377  INFO 33905 --- [        timer-1] c.n.d.cloud.config.MyServiceClientImpl  : Successfully refreshed pod: Item(name=log, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.378  INFO 33905 --- [        timer-1] c.n.d.cloud.config.MyServiceClientImpl  : Attempting to refresh pod: Item(name=fuel, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.381  INFO 33905 --- [        timer-1] c.n.d.cloud.config.MyServiceClientImpl  : Successfully refreshed pod: Item(name=fuel, ip=127.0.0.1, port=8888, podPhase=Running).

Upvotes: 0

Views: 1339

Answers (3)

Abhijit Sarkar
Abhijit Sarkar

Reputation: 24518

I'm posting an answer myself for completeness. With help from @simon-baslé and @akarnokd, I got it right. Both of the following work. See reactor-core#421 for details.

Solution 1:

zipWith(Flux.interval(Duration.ofSeconds(groupMemberDelaySeconds)),
    (x, delay) -> x)
.publishOn(Schedulers.newParallel("par-grp"))
.flatMap(this:: refreshWithRetry)

Solution 2:

zipWith(Flux.intervalMillis(1000 * groupMemberDelaySeconds, Schedulers.newTimer("par-grp")),
    (x, delay) -> x)
.flatMap(this:: refreshWithRetry)

No subscribeOn or publishOn is necessary in the refreshPods method.

Upvotes: 0

Simon Basl&#233;
Simon Basl&#233;

Reputation: 28301

edit: As made more explicit thanks to your newly provided log, and as picked up by David in the issue you created, the root cause is that you use an interval here. This will switch context to the default TimedScheduler (which will be the same for all groups). That's why anything done before the call to refreshPods seems to be ignored (work is done on the interval thread), but publishOn/subscribeOn after the interval operator should work. In a nutshell my advice of using subscribeOn directly after the create still stands.

You trigger a blocking behavior (refresh(pod)) that you wrap as a Mono in refreshWithRetry.

Unless you have a strong need for being concurrency-agnostic at this level, I'd advise you to immediately chain your subscribeOn next to the create.

This way, no surprise when using that Mono: it respects the contract and doesn't block. Like this:

return Mono.<Boolean>create(emitter -> {
        //...
    })
.subscribeOn(Schedulers.newParallel("par-grp"))
.map(b -> Tuples.of(pod.getIp(), b))

If you want the method to return a concurrency-agnostic publisher, then you'd need to put the subscribeOn closer to your blocking publisher, so you'd need to expand the flatMap lambda:

.flatMap(pods -> client.refreshPods(pods)
                       .subscribeOn(Schedulers.newParallel("par-grp"))
)

Upvotes: 1

Yaroslav Stavnichiy
Yaroslav Stavnichiy

Reputation: 21446

In your code you put publishOn before flatMap. Methods combining observables like flatMap or zip do their own re-scheduling when working with async sources. interval is such an async source in your case. That is why you get all results on 'timer' thread.

1) Use publishOn right before the operation you wish to go in parallel. The operation itself should not involve re-scheduling. Eg. map is a good one, flatMap is bad.

2) Use another publishOn right after it to reschedule results. Otherwise subscriber's thread might interfere.

Flux.range(1, 100)
        .groupBy(i -> i % 5)
        .flatMap(group -> group
                .publishOn(Schedulers.newParallel("grp", 8))
                .map(v -> {
                    // processing here
                    String threadName = Thread.currentThread().getName();
                    logger.info("processing {} from {} on {}", v, group.key(), threadName);
                    return v;
                })
                .publishOn(Schedulers.single())
        )
        .subscribe(v -> logger.info("got {}", v));

In case you want to make sure all group's items run on the same thread see this answer: https://stackoverflow.com/a/41697348/697313

Upvotes: 0

Related Questions