Reputation: 661
Modified to clarify what I want to achieve.
Can I write a code with RxJs where the Observable collects data from Observers? Like scenario below:
.filter()
) will respond with the answerWith Rxjs, when the Observable fires a new value with .next()
, every Observer with method .subscribe()
will react to that, but I don't know a way this observer can return a value to the Observable.
So, here is what I need:
I hope now I'm clear on my needs :)
Upvotes: 1
Views: 1104
Reputation: 1591
The Events-ex library supports that the observer send back a value to the observable.
eventable = require('events-ex/eventable')
class Examiner
# advanced usage see API topic.
eventable Examiner
makeQuestion: ->
aQuestion = '....'
theAnswer = this.emit 'ask', aQuestion
class Student
constructor: (aExaminer)->
aExaminer.on 'ask', this.answer if aExaminer
answer: (aQuestion)->
# this is the event object, not the student instance.
this.result = 'myAnswer'
this.stopped = true # stop other listeners. defaults to false.
return
Upvotes: 1
Reputation: 9447
Ok, I understand the problem better.
I believe what you need is for both Examiner and Students to have Observable and Observer properties. This is the only way you will get 2-way communication using Observables, as they are only a 1-way communication mechanism.
race
to make them race.const question$ = Rx.Observable.range(1, 4)
// in case you want everyone to wait
.publish();
function makeStudent(name, worstTimeToAnswer) {
return q =>
Rx.Observable.of(`${name} answers question ${q}`)
.delay(Math.random() * 1000 * worstTimeToAnswer);
}
// Alice answers questions in at most 5 seconds
const alice = makeStudent("Alice", 5);
// Bob answers questions in at most 7 seconds
const bob = makeStudent("Bob", 7);
// This stream contains one answer for each question, as long as someone
// answered fast enough.
const winningAnswer$ =
question$
.concatMap(q =>
// see who answers faster between Alice and Bob
Rx.Observable.race(alice(q), bob(q))
// or drop the question if nobody answers within 4 seconds
.takeUntil(Rx.Observable.timer(4000))
);
winningAnswer$.subscribe(a => console.log(a));
question$.connect(); // questions start flowing
// should print something like:
// Alice answers question 1
// Bob answers question 3
// Alice answers question 4
// where Q2 was dropped because nobody answered fast enough
// and Alice often answers faster than Bob
If you actually want a feedback loop where the answers to a question change the next question, then you will probably need to use a Subject
to close the loop.
Upvotes: 2