Nicolás Fantone
Nicolás Fantone

Reputation: 2100

Get return value from subcribe on Observable

Using RxJS 5.0.0-rc.1, I'm trying to communicate my Observer and Observable in a way similar to how generators/iterators work by exchanging data using yield and .next(). The intention is to get a hold of what a call to .subscribe returns and modify/update following values in my observable stream depending on that.

I'm not entirely sure if this is, at all, possible. Though, I found out that you can catch exceptions thrown on .subscribe callbacks. The following snippets prints out "Boom!":

var source = Observable.create((observer) => {
  try {
    observer.next(42);
  } catch (e) {
    // This will catch the Error
    // thrown on the subscriber
    console.log(e.message);
  }
  observer.complete();
});

source.subscribe(() => {
  throw new Error('Boom!');
});

So, what if instead of throwing, the subscriber returns a value? Is there a way for the Observable to retrieve it? Perhaps I'm approaching this the wrong way. If so, what's the "reactive" way of doing things in this scenario?

Many thanks.


EDIT

One possible way I came up with is by providing a callback function on every item in the stream. Something like:

var source = Observable.create((observer) => {
  // This will print "{ success: true }"
  observer.next({ value: 42, reply: console.log });
  observer.complete();
});

source.subscribe(({ value, reply }) => {
  console.log('Got', value);
  return reply({ success: true });
});

Any other thoughts?


EDIT 2

Since my original question brought some confusion on what I was trying to achieve, I'll describe my real world scenario. I'm writing the API of a module for managing messages through queues (much like a simplified, in memory, AMQP-RPC mechanism) and I though RxJS would be a good fit.

It works like you would expect: a Publisher pushes messages to a queue, which get delivered to a Consumer. In term, the Consumer can reply to the Publisher, which can listen to that response if it's interested.

In an ideal scenario, the API would look something like this:

Consumer().consume('some.pattern')
  .subscribe(function(msg) {
    // Do something with `msg`
    console.log(msg.foo);
    return { ok: true };
  });

Publisher().publish('some.pattern', { foo: 42 })
// (optional) `.subscribe()` to get reply from Consumer

That example would print 42.

The logic for replying to the Publisher lies within the Consumer function. But the actual response comes from the .subscribe() callback. Which leads me to my original question: how should I go about fetching that returned value from the creator of the stream?

Think of Consumer#consume() as:

/**
 * Returns an async handler that gets invoked every time
 * a new message matching the pattern of this consumer
 * arrives.
 */
function waitOnMessage(observer) {
  return function(msg) {
    observer.next(msg);
    // Conceptually, I'd like the returned
    // object from `.subscribe()` to be available
    // in this scope, somehow.
    // That would allow me to go like: 
    // `sendToQueue(pubQueue, response);`
  }
}

return Observable.create((observer) => {
  queue.consume(waitOnMessage(observer));
});

Does it make any more sense?

Upvotes: 4

Views: 4631

Answers (1)

user3743222
user3743222

Reputation: 18665

There are indeed similarities between generators and observables. As you can see here, observables (asynchronous sequence of values) are the asynchronous version of iterables (synchronous sequence of values).

Now, a generator is a function which returns an Iterable. However, Rxjs Observable encloses both a generator - a.k.a producer (that you execute/start by calling subscribe) and the produced asynchronous sequence of values (that you observe by passing an Observer object). And the subscribe call returns a Disposable which allows you to stop receiving values (disconnect). So while generators and observables are dual concepts, the APIs to use them differ.

You cannot do two-way communication by default with the rxjs observable API. You probably could manage to do it by constructing yourself the back channel through subjects (note that you MUST have an initial value to kick off the cycle).

var backChannel = Rx.Subject();
backChannel.startWith(initialValue).concatMap(generateValue)
  .subscribe(function observer(value){
  // Do whatever
  // pass a value through the backChannel
  backChannel.next(someValue)
})
// generateValue is a function which takes a value from the back channel 
// and returns a promise with the next value to be consumed by the observer.

You could consider wrapping that with :

function twoWayObsFactory (yield, initialValue) {
  var backChannel = Rx.BehaviorSubject(initialValue);
  var next = backChannel.next.bind(backChannel);
  return {
    subscribe : function (observer) {
      var disposable = backChannel.concatMap(yield)
        .subscribe(function(x) {
           observer(next, x);
        });
      return {
        dispose : function (){disposable.dispose(); backChannel.dispose();}
      }
    }
  }
}

// Note that the observer is now taking an additional parameter in its signature
// for instance
// observer = function (next, yieldedValue) {
//              doSomething(yieldedValue);
//              next(anotherValue);
//            }
// Note also that `next` is synchronous, as such you should avoir sequences
// of back-and-forth communication that is too long. If your `yield` function
// would be synchronous, you might run into stack overflow errors.
// All the same, the `next` function call should be the last line, so order of
// execution in your program is the same independently of the synchronicity of
// the `yield` function

Otherwise, the behaviour you describe seems to be that of an asynchronous generator. I never used such, but as this is a proposal for some future version of javascript, I think you can already start trying it out with Babel (cf. https://github.com/tc39/proposal-async-iteration).

EDIT :

If you are looking for a loop-back mechanism (less general purpose approach but could very well fits your use case, if what you want to do is simple enough), the expand operator could help. To understand its behaviour, please check the doc, and the following answers on SO for examples of use in concrete contexts:

Basically expand allows you to both emit a value downstream and feed that value back at the same time in your producer.

Upvotes: 2

Related Questions