Marinho Brandão
Marinho Brandão

Reputation: 661

Rx.Observable.subscribe return a value to observable

Modified to clarify what I want to achieve.

Can I write a code with RxJs where the Observable collects data from Observers? Like scenario below:

  1. one Examiner, multiple Students
  2. the Examiner is an Observable
  3. each Student is an Observer
  4. every time the Examiner makes a question, the Students who know the answer (i.e. .filter()) will respond with the answer
  5. the Examiner will take the first answer as the correct one

With Rxjs, when the Observable fires a new value with .next(), every Observer with method .subscribe() will react to that, but I don't know a way this observer can return a value to the Observable.

So, here is what I need:

  1. how can the Observer send back a value to the Observable?
  2. is it possible that the first Observer who respond wins the race and the others are ignored after that?
  3. is it possible to know that no Observer responded?

I hope now I'm clear on my needs :)

Upvotes: 1

Views: 1104

Answers (2)

Riceball LEE
Riceball LEE

Reputation: 1591

The Events-ex library supports that the observer send back a value to the observable.

eventable = require('events-ex/eventable')

class Examiner
  # advanced usage see API topic.
  eventable Examiner
  makeQuestion: ->
    aQuestion = '....'
    theAnswer = this.emit 'ask', aQuestion

class Student
  constructor: (aExaminer)->
    aExaminer.on 'ask', this.answer if aExaminer
  answer: (aQuestion)->
    # this is the event object, not the student instance.
    this.result = 'myAnswer'
    this.stopped = true # stop other listeners. defaults to false.
    return

Upvotes: 1

Ptival
Ptival

Reputation: 9447

Ok, I understand the problem better.

I believe what you need is for both Examiner and Students to have Observable and Observer properties. This is the only way you will get 2-way communication using Observables, as they are only a 1-way communication mechanism.

  1. The solution is to make the students' answers be observables.
  2. Then you can use race to make them race.
  3. Hmmm... You'd have to define what it means for a student not to answer: is there a timeout, or are they able to declare that they will not answer later?
const question$ = Rx.Observable.range(1, 4)
                               // in case you want everyone to wait
                               .publish();

function makeStudent(name, worstTimeToAnswer) {
  return q =>
    Rx.Observable.of(`${name} answers question ${q}`)
                 .delay(Math.random() * 1000 * worstTimeToAnswer);
}

// Alice answers questions in at most 5 seconds
const alice = makeStudent("Alice", 5);
// Bob answers questions in at most 7 seconds
const bob = makeStudent("Bob", 7);

// This stream contains one answer for each question, as long as someone
// answered fast enough.
const winningAnswer$ =
  question$
    .concatMap(q =>
      // see who answers faster between Alice and Bob
      Rx.Observable.race(alice(q), bob(q))
      // or drop the question if nobody answers within 4 seconds
                   .takeUntil(Rx.Observable.timer(4000))
    );

winningAnswer$.subscribe(a => console.log(a));
question$.connect(); // questions start flowing

// should print something like:

// Alice answers question 1
// Bob answers question 3
// Alice answers question 4

// where Q2 was dropped because nobody answered fast enough
// and Alice often answers faster than Bob

If you actually want a feedback loop where the answers to a question change the next question, then you will probably need to use a Subject to close the loop.

Upvotes: 2

Related Questions