MrKY
MrKY

Reputation: 75

rxjava and terminating streams

I'm new to reactive programming using rxjava and after going through the simpler examples I'm now trying to figure out how to work with continuous streams. The problem I have with the example below is that the program doesn't terminate after I've taken the 3 elements. My assumption is that I somehow need to unsubscribe to my observable but I don't fully grasp how to terminate the while loop and make the program exit.

I've come across the following post RxJava -- Terminating Infinite Streams but I still can't figure out what I'm missing.

class MyTwitterDataProvider {
/*
This example is written in Groovy

Instance variables and constructor omitted
*/

public Observable<String> getTweets() {
    BufferedReader reader = new BufferedReader(new InputStreamReader(getTwitterStream()))

    Observable.create({ observer ->
        executor.execute(new Runnable() {
            def void run() {
                String newLine
                while ((newLine = reader.readLine()) != null) {
                    System.out.println("printing tweet: $newLine")
                    observer.onNext(newLine)
                }

                observer.onCompleted()
            }
        })
    })
}

def InputStream getTwitterStream() {
// code omitted
}

public static void main (String [] args) {
    MyTwitterDataProvider provider = new MyTwitterDataProvider()
    Observable<String> myTweetsObservable = provider.getTweets().take(3)

    Subscription myTweetSubscription = myTweetsObservable.subscribe({tweet-> println("client prints: $tweet")})
   // myTweetSubscription.unsubscribe()
}
}

Upvotes: 3

Views: 1415

Answers (1)

alexwen
alexwen

Reputation: 1128

You must add a check in your loop to see if the observer is still subscribed:

            while ((newLine = reader.readLine()) != null && !observer.isUnsubsribed()) {
                System.out.println("printing tweet: $newLine")
                observer.onNext(newLine)
            }

Upvotes: 2

Related Questions