Michael Sotnikov
Michael Sotnikov

Reputation: 628

Combine multiple items from observable to new objects using RxJava

I'm trying to implement some kind of streaming parser. Let's say that I have stream of integers, and I combine them to create new object, which aggregates part of stream.

For example, object is "done" when integer is negative. To keep it simple, produced items will be strings of numbers.

Here is simple example:

Source: 1, -2, 3, 4, -5, 6, -7, 8, 9, 10, -11
Output: "1-2", "343-5", "6-7", "8910-11"

I can't find proper combination of existing operators. My solution was to provide custom operator, which aggregates data till "chunk" is valid and emits new object:

rx.Observable<Integer> src = rx.Observable
    .from(Arrays.asList(1, -2, 3, 4, -5, 6, -7, 8, 9, 10, -11));

rx.Observable<String> res = src
    .lift(new rx.Observable.Operator<String, Integer> () {

        @Override
        public rx.Subscriber<? super Integer> call(
            final rx.Subscriber<? super String> subscriber) {

            return new rx.Subscriber<Integer>(subscriber) {

                private final StringBuilder cur = new StringBuilder();

                @Override
                public void onCompleted() {
                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onCompleted();
                    }
                }

                @Override
                public void onError(Throwable e) {
                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onError(e);
                    }
                }

                @Override
                public void onNext(Integer i) {
                    if (subscriber.isUnsubscribed())
                        return;

                    cur.append(i).append('');
                    if (i < 0) {
                        subscriber.onNext(cur.toString());
                        cur.delete(0, cur.length());
                    }
                }
            };
        }
    });

rx.observers.TestSubscriber<String> sub =
    new rx.observers.TestSubscriber<>();
res.subscribe(sub);
List<String> l1 = sub.getOnNextEvents();

It works well till I use some positional operators on them, like take(N) or combineLatest.

rx.Observable<String> res2 = res.take(2);

In this situation I don't get onCompleted or onError and smaller amount of items are emitted.

It looks like each input onNext should have output onNext called.

Any ideas?

Upvotes: 0

Views: 1755

Answers (1)

Michael Sotnikov
Michael Sotnikov

Reputation: 628

Hmm. Writing long post did help to come with new idea and it works. (I bang head the wall 2 days already).

Produce Observer<Observer<String>> and emit Observer.empty() till object is ready. flatMap after.

Here is working example:

rx.Observable<rx.Observable<String>> res = src
    .lift(new rx.Observable.Operator<rx.Observable<String>, Integer> () {

        @Override
        public rx.Subscriber<? super Integer> call(
            final rx.Subscriber<? super rx.Observable<String>> subscriber) {

            return new rx.Subscriber<Integer>(subscriber) {

                private final StringBuilder cur = new StringBuilder();

                // onError, onCompleted skipped

                @Override
                public void onNext(Integer i) {
                    rx.Observable<String> out;
                    cur.append(i);
                    if (i < 0) {
                        out = rx.Observable.just(cur.toString());
                        cur.delete(0, cur.length());
                    } else {
                        out = rx.Observable.<String>empty();
                    }
                    subscriber.onNext(out);
                }
            };
        }
    });

rx.Observable<String> res2 = res
    .flatMap(stringObservable -> stringObservable)
    .take(2);

Upvotes: 1

Related Questions