nutella_eater
nutella_eater

Reputation: 3582

How to create observable only once?

I have a specific funtion which should emit items whenever someone is subscribed or not. Also, this function should be executed only once and if someone subscribes to it it should NOT being executed

 Observable<CharSequence> observable = Observable.create(subscriber -> {

            try {

                sseEventSource.connect(); //this should be called once when created;

                final SseEventReader sseEventReader = sseEventSource.getEventReader();
                SseEventType type = sseEventReader.next();
                while (type != SseEventType.EOS) {
                    if (type != null && type.equals(SseEventType.DATA) && sseEventReader.getData() != null) {
                        CharSequence data = sseEventReader.getData();
                        if (!subscriber.isUnsubscribed()) {
                            subscriber.onNext(data);
                        }
                    }
                    type = sseEventReader.next();
                }
                sseEventSource.close();
                Log.d("SseService", "closed");
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onCompleted();
                }
            } catch (URISyntaxException | IOException e) {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onError(e);
                }
            }

Upvotes: 0

Views: 650

Answers (2)

JohnWowUs
JohnWowUs

Reputation: 3083

Try the share() operator.

Observable<CharSequence> observable = Observable.create(subscriber -> {

        try {

            sseEventSource.connect(); //this should be called once when created;

            final SseEventReader sseEventReader = sseEventSource.getEventReader();
            SseEventType type = sseEventReader.next();
            while (type != SseEventType.EOS) {
                if (type != null && type.equals(SseEventType.DATA) && sseEventReader.getData() != null) {
                    CharSequence data = sseEventReader.getData();
                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onNext(data);
                    }
                }
                type = sseEventReader.next();
            }
            sseEventSource.close();
            Log.d("SseService", "closed");
            if (!subscriber.isUnsubscribed()) {
                subscriber.onCompleted();
            }
        } catch (URISyntaxException | IOException e) {
            if (!subscriber.isUnsubscribed()) {
                subscriber.onError(e);
            }
        }).share() ;

If you genuinely want a hot observable use publish() and connect() to initiate the observable even if no one is subscribed.

Upvotes: 1

Divers
Divers

Reputation: 9569

You can accomplish that using Subjects, for instance:

public static void main(String[] args) {
    Main m = new Main();
    m.getChanges().subscribe(x -> {
        //Data
    }, e -> {
        //Errors
    });
    m.connect();
}

private PublishSubject<CharSequence> ps = PublishSubject.create();

public void connect() {
    sseEventSource.connect();
}

public Observable<CharSequence> getChanges() {
    try {
        final SseEventReader sseEventReader = sseEventSource.getEventReader();
        SseEventType type = sseEventReader.next();
        while (type != SseEventType.EOS) {
            if (type != null && type.equals(SseEventType.DATA) && sseEventReader.getData() != null) {
                CharSequence data = sseEventReader.getData();
                ps.onNext(data);
            }
            type = sseEventReader.next();
        }
        sseEventSource.close();
        ps.onComplete();
    } catch (URISyntaxException | IOException e) {
        ps.onError(e);
    }
    return ps;
}

That way you can subscribe/unsubscribe on changes from subject when you want and establish connection only once. If you want to recieved buffered changes, please consider to use ReplaySubject.

Upvotes: 0

Related Questions