Reputation: 3582
I have a specific funtion which should emit items whenever someone is subscribed or not. Also, this function should be executed only once and if someone subscribes to it it should NOT being executed
Observable<CharSequence> observable = Observable.create(subscriber -> {
try {
sseEventSource.connect(); //this should be called once when created;
final SseEventReader sseEventReader = sseEventSource.getEventReader();
SseEventType type = sseEventReader.next();
while (type != SseEventType.EOS) {
if (type != null && type.equals(SseEventType.DATA) && sseEventReader.getData() != null) {
CharSequence data = sseEventReader.getData();
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(data);
}
}
type = sseEventReader.next();
}
sseEventSource.close();
Log.d("SseService", "closed");
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
} catch (URISyntaxException | IOException e) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(e);
}
}
Upvotes: 0
Views: 650
Reputation: 3083
Try the share()
operator.
Observable<CharSequence> observable = Observable.create(subscriber -> {
try {
sseEventSource.connect(); //this should be called once when created;
final SseEventReader sseEventReader = sseEventSource.getEventReader();
SseEventType type = sseEventReader.next();
while (type != SseEventType.EOS) {
if (type != null && type.equals(SseEventType.DATA) && sseEventReader.getData() != null) {
CharSequence data = sseEventReader.getData();
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(data);
}
}
type = sseEventReader.next();
}
sseEventSource.close();
Log.d("SseService", "closed");
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
} catch (URISyntaxException | IOException e) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(e);
}
}).share() ;
If you genuinely want a hot observable use publish()
and connect()
to initiate the observable even if no one is subscribed.
Upvotes: 1
Reputation: 9569
You can accomplish that using Subjects, for instance:
public static void main(String[] args) {
Main m = new Main();
m.getChanges().subscribe(x -> {
//Data
}, e -> {
//Errors
});
m.connect();
}
private PublishSubject<CharSequence> ps = PublishSubject.create();
public void connect() {
sseEventSource.connect();
}
public Observable<CharSequence> getChanges() {
try {
final SseEventReader sseEventReader = sseEventSource.getEventReader();
SseEventType type = sseEventReader.next();
while (type != SseEventType.EOS) {
if (type != null && type.equals(SseEventType.DATA) && sseEventReader.getData() != null) {
CharSequence data = sseEventReader.getData();
ps.onNext(data);
}
type = sseEventReader.next();
}
sseEventSource.close();
ps.onComplete();
} catch (URISyntaxException | IOException e) {
ps.onError(e);
}
return ps;
}
That way you can subscribe/unsubscribe on changes from subject when you want and establish connection only once. If you want to recieved buffered changes, please consider to use ReplaySubject
.
Upvotes: 0