Reputation: 51255
I am trying to implement a auto-reconnect client for a server which receives a command and then replies with a single byte. However, the problem is that one cannot send any additional commands to the server while it is processing commands. So I need to somehow serialise the commands, is that possible to achieve in a pragmatic way in RxJS?
const onClient = new BehaviourSubject(...) // Auto-reconnecting client
function sendCommand(cmd) {
return onClient
.concatMap(client => {
client.write(cmd + '\r\n')
return Rx.Observable.fromEvent(client, 'data').take(1)
})
}
sendCommand('CMD1').subscribe(x => console.log(x))
sendCommand('CMD2').subscribe(x => console.log(x)) // Oops, sent a command while another one is active...
Here is one possible solution that lacks error handling and looks quite inefficient.
const input = new Rx.Subject()
const output = new Rx.Subject()
input.concatMap(({cmd, id})) => onClient
.filter(client => client != null)
.concatMap(client => {
client.write(cmd + '\r\n')
return Rx.Observable.fromEvent(client, 'data').take(1)
})
.map(value => ({value, id}))
.subscribe(output)
function sendCommand(cmd) {
const id = cuid()
input.onNext(id)
return output
.filter(res => res.id === id)
.map(res => res.value)
}
Any better ideas or suggestions on improvement?
Upvotes: 0
Views: 79
Reputation: 51255
Here is the rather complicated try I ended up with:
import stampit from 'stampit'
import Rx from 'rx'
import cuid from 'cuid'
let input = new Rx.Subject()
let output = new Rx.Subject()
input
.concatMap(({fn, id}) => Rx.Observable
.defer(() => fn())
.map(value => ({value, id}))
.catch(error => Rx.Observable.return({error, id}))
.concat(Rx.Observable.return({id})))
.subscribe(output)
async function enqueue(fn) {
const id = cuid()
input.onNext({fn, id})
output
.filter(res => res.id === id)
.takeWhile(res => res.error || res.value)
.concatMap(res => res.error
? Rx.Observable.throw(res.error)
: Rx.Observable.return(res.value))
}
})
const onClient = new BehaviourSubject(...) // Auto-reconnecting client
function sendCommand(cmd) {
return enqueue(() => onClient
.concatMap(client => {
client.write(cmd + '\r\n')
return Rx.Observable.fromEvent(client, 'data').take(1)
}))
}
sendCommand('CMD1').subscribe(x => console.log(x))
sendCommand('CMD2').subscribe(x => console.log(x))
Upvotes: 0
Reputation: 9876
Here is my gut instinct. I've only ever used JavaRX, and that just barely. Note that this assumes you want 1 invocation of CMD2 for every return of CMD1.
const onClient = new BehaviourSubject(...) // Auto-reconnecting client
function sendCommand(cmd) {
return onClient
.concatMap(client => {
client.write(cmd + '\r\n')
return Rx.Observable.fromEvent(client, 'data').take(1)
})
}
sendCommand('CMD1').subscribe(function(x) {
console.log(x);
sendCommand('CMD2').subscribe(y => console.log(y))
});
For what it's worth, you may want to consider using Promises for this stuff. My understanding of Rx is that it is useful for complex streams of async data, such as event streams. But if all you want is the async part, I believe Promises might be easier. We were considering using it on a Java project and decided it wasn't what we needed. See: When to Use Rx
I don't know what you are working on, but a command-response pattern seems to me like it might be better served by Promises, especially if you expect the lambda you're passing into subscribe
to only be invoked once.
Upvotes: 1