Sunny
Sunny

Reputation: 14808

RxJava2 batch items

I've a continuously generated log stream i.e a method which get called whenever a new log is available in the system. I don't want to process the log every time it is generated(because logs are generated every milliseconds or so).

I want to collect logs which are emitted over a period of time let say 5 seconds and then process them in batch.

How can I achieve this using rxjava.

I've tried something like

private static void logResults(LogData logData) {
        Observable.create((ObservableOnSubscribe<LogData>) e -> {
            e.onNext(logData);
        }).buffer(5, TimeUnit.SECONDS).subscribeWith(new DisposableObserver<List<LogData>>() {
            @Override
            public void onNext(List<LogData> logData) {
               System.out.print(logData.toString()));
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });
    }


 /**
    This method get called every time when new log is there
  */
  public static void logGenerated(LogData log) {
     logResults(log);
  }

Upvotes: 1

Views: 175

Answers (1)

akarnokd
akarnokd

Reputation: 69997

You need to create a flow that stays active across multiple calls to logResults. The simplest way is to use a static PublishSubject:

private static final Subject<LogData> subject =
    PublishSubject.<LogData>create(); // .toSerialized();

private static final Disposable logProcessing =
    subject.buffer(5, TimeUnit.SECONDS)
    .subscribeWith(new DisposableObserver<List<LogData>>() {
        @Override
        public void onNext(List<LogData> logData) {
           System.out.print(logData.toString()));
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
    });

private static void logResults(LogData logData) {
    subject.onNext(logData);
}


/**
 * This method get called every time when new log is there
 */
public static void logGenerated(LogData log) {
    logResults(log);
}

Upvotes: 4

Related Questions