RxJava : Reduce not working as expected

I'm trying to load a user object in parallel.

    final User user = new User();
    final Observable<User> userObs = Observable.just("NAMES", "ADDRESSES", "CURRENT_ADDRESS")
            .flatMap(field -> getOrchestrator(user, field))
            .scan(new User(), (finalUser, event) -> {
                finalUser = event;
                return finalUser;
            });

Scan does emit three user object, where as the reduce does not emit any items at all? what am I doing wrong here.

    final User user = new User();
    final Observable<User> userObs = Observable.just("NAMES", "ADDRESSES", "CURRENT_ADDRESS")
            .flatMap(field -> getOrchestrator(user, field))
            .reduce(new User(), (finalUser, event) -> {
                finalUser = event;
                return finalUser;
            });

The getOrchestrator returns Observable. Any help will be appreciated.

Below is the complete code snippet

public class Orchestrator {
    private String userId;

    public Orchestrator(final String userId) {
        this.userId = userId;
    }

    public static void main(final String[] args) throws Exception {
        final User user = new User();
        final Observable<User> userObs = Observable.just("NAMES", "ADDRESSES", "CURRENT_ADDRESS")
                .flatMap(field -> getOrchestrator(user, field))
                .scan(new User(), (finalUser, event) -> {
                    finalUser = event;
                    return finalUser;
                });

        userObs.subscribeOn(Schedulers.io()).subscribe(result -> {
            System.out.println(result.toString());
        });

        TimeUnit.SECONDS.sleep(10);

    }

    private static Observable<User> getOrchestrator(final User user, final String fieldName) {
        switch (fieldName) {
            case "CURRENT_ADDRESS":
                return new AddressOrchestrator().getCurrentAddress(user.getUserId())
                        .map(currentAddress -> {
                            user.setAddress(currentAddress);
                            try {
                                TimeUnit.MILLISECONDS.sleep(200);
                            }
                            catch (final InterruptedException e) {

                            }
                            return user;
                        });
            case "ADDRESSES":
                return new AddressOrchestrator().getAddresses(user.getUserId())
                        .map(addresses -> {
                            user.setAddresses(addresses);
                            try {
                                TimeUnit.MILLISECONDS.sleep(200);
                            }
                            catch (final InterruptedException e) {

                            }
                            return user;
                        });

            case "NAMES":
                return new NameOrchestrator().getNames(user.getUserId())
                        .map(names -> {
                            user.setNames(names);
                            try {
                                TimeUnit.MILLISECONDS.sleep(200);
                            }
                            catch (final InterruptedException e) {

                            }
                            return user;
                        });
        }
        return null;
    }

    public User getUser() {
        final Random r = new Random();
        if (r.nextInt(3) % 2 == 0) {
            return new User();
        }
        throw new RuntimeException();
    }
}

Each orchestrator returns Observable.

public class AddressOrchestrator {

    public Observable<List<Address>> getAddresses(final String userId) {
        return Observable.create(s -> {
            final Address currentAddress = this.getBaseAddress(userId);
            final Address anotherAddress = this.getBaseAddress(userId);
            anotherAddress.setState("NE");
            s.onNext(Arrays.asList(currentAddress, anotherAddress));
        });

    }

    public Observable<Address> getCurrentAddress(final String userId) {
        return Observable.create(s -> s.onNext(this.getBaseAddress(userId)));
    }

    public Address getBaseAddress(final String userId) {
        final Address address = new Address();
        address.setLine1("540 Caddo Lake Dr");
        address.setCity("Georgetown");
        address.setCountry("USA");
        address.setState("TX");

        return address;
    }
}


public class NameOrchestrator {

    public Observable<List<Name>> getNames(final String userId) {
        return Observable.create(s -> {
            final Name name = new Name();
            name.setName("Vanchi");
            final Name formerName = new Name();
            formerName.setName("Vanchinathan");
            s.onNext(Arrays.asList(name, formerName));
        });
    }
}

Upvotes: 0

Views: 897

Answers (2)

Alexander Perfilyev
Alexander Perfilyev

Reputation: 6839

That is happening because scan emits all upstream items while reduce will emit only if stream is completed.

Replace

public Observable<Address> getCurrentAddress(final String userId) {
    return Observable.create(s -> s.onNext(this.getBaseAddress(userId)));
}

with

public Observable<Address> getCurrentAddress(final String userId) {
    return Observable.fromCallable(() -> this.getBaseAddress(userId));
}

and

public Observable<List<Address>> getAddresses(final String userId) {
    return Observable.create(s -> {
        final Address currentAddress = this.getBaseAddress(userId);
        final Address anotherAddress = this.getBaseAddress(userId);
        anotherAddress.setState("NE");
        s.onNext(Arrays.asList(currentAddress, anotherAddress));
    });
}

with

public Observable<List<Address>> getAddresses(final String userId) {
    return Observable.fromCallable(() -> this.getBaseAddress(userId))
            .zipWith(Observable.fromCallable(() -> this.getBaseAddress(userId))
                    .map(address -> address.setState("NE")),
                    (currentAddress, anotherAddress) -> Arrays.asList(currentAddress, anotherAddress));
}

and change your setAddress() method to this

public Address setState(String state) {
    this.state = state;
    return this;
}

fromCallable() is a better way to create an Observable that emits at most one element, because it handles errors and backpressure for you.

Upvotes: 1

JohnWowUs
JohnWowUs

Reputation: 3083

Your orchestrations which you create using Observable.create (a big red flag unless you really know what you're doing) do not terminate. This leads to infinite streams (in the sense that a complete event is never emitted). What you need it something like

public Observable<List<Address>> getAddresses(final String userId) {
        return Observable.create(s -> {
            final Address currentAddress = this.getBaseAddress(userId);
            final Address anotherAddress = this.getBaseAddress(userId);
            anotherAddress.setState("NE");
            s.onNext(Arrays.asList(currentAddress, anotherAddress));
            s.onCompleted();
        });

Note the onCompleted being called. This will fix your superficial problems but you're better off getting rid of the Observable.create in the first place and using something like Observable.defer.

Upvotes: 3

Related Questions