Reputation: 75
I'm new to reactive programming using rxjava and after going through the simpler examples I'm now trying to figure out how to work with continuous streams. The problem I have with the example below is that the program doesn't terminate after I've taken the 3 elements. My assumption is that I somehow need to unsubscribe to my observable but I don't fully grasp how to terminate the while loop and make the program exit.
I've come across the following post RxJava -- Terminating Infinite Streams but I still can't figure out what I'm missing.
class MyTwitterDataProvider {
/*
This example is written in Groovy
Instance variables and constructor omitted
*/
public Observable<String> getTweets() {
BufferedReader reader = new BufferedReader(new InputStreamReader(getTwitterStream()))
Observable.create({ observer ->
executor.execute(new Runnable() {
def void run() {
String newLine
while ((newLine = reader.readLine()) != null) {
System.out.println("printing tweet: $newLine")
observer.onNext(newLine)
}
observer.onCompleted()
}
})
})
}
def InputStream getTwitterStream() {
// code omitted
}
public static void main (String [] args) {
MyTwitterDataProvider provider = new MyTwitterDataProvider()
Observable<String> myTweetsObservable = provider.getTweets().take(3)
Subscription myTweetSubscription = myTweetsObservable.subscribe({tweet-> println("client prints: $tweet")})
// myTweetSubscription.unsubscribe()
}
}
Upvotes: 3
Views: 1415
Reputation: 1128
You must add a check in your loop to see if the observer is still subscribed:
while ((newLine = reader.readLine()) != null && !observer.isUnsubsribed()) {
System.out.println("printing tweet: $newLine")
observer.onNext(newLine)
}
Upvotes: 2