Reputation: 14808
I've a continuously generated log stream i.e a method which get called whenever a new log is available in the system. I don't want to process the log every time it is generated(because logs are generated every milliseconds or so).
I want to collect logs which are emitted over a period of time let say 5 seconds and then process them in batch.
How can I achieve this using rxjava.
I've tried something like
private static void logResults(LogData logData) {
Observable.create((ObservableOnSubscribe<LogData>) e -> {
e.onNext(logData);
}).buffer(5, TimeUnit.SECONDS).subscribeWith(new DisposableObserver<List<LogData>>() {
@Override
public void onNext(List<LogData> logData) {
System.out.print(logData.toString()));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
/**
This method get called every time when new log is there
*/
public static void logGenerated(LogData log) {
logResults(log);
}
Upvotes: 1
Views: 175
Reputation: 69997
You need to create a flow that stays active across multiple calls to logResults
. The simplest way is to use a static PublishSubject
:
private static final Subject<LogData> subject =
PublishSubject.<LogData>create(); // .toSerialized();
private static final Disposable logProcessing =
subject.buffer(5, TimeUnit.SECONDS)
.subscribeWith(new DisposableObserver<List<LogData>>() {
@Override
public void onNext(List<LogData> logData) {
System.out.print(logData.toString()));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
private static void logResults(LogData logData) {
subject.onNext(logData);
}
/**
* This method get called every time when new log is there
*/
public static void logGenerated(LogData log) {
logResults(log);
}
Upvotes: 4