ronag
ronag

Reputation: 51255

subscribe while preserving order

I am trying to implement a auto-reconnect client for a server which receives a command and then replies with a single byte. However, the problem is that one cannot send any additional commands to the server while it is processing commands. So I need to somehow serialise the commands, is that possible to achieve in a pragmatic way in RxJS?

const onClient = new BehaviourSubject(...) // Auto-reconnecting client

function sendCommand(cmd) {
  return onClient
     .concatMap(client => {
       client.write(cmd + '\r\n')
       return Rx.Observable.fromEvent(client, 'data').take(1)
     })
}

sendCommand('CMD1').subscribe(x => console.log(x))
sendCommand('CMD2').subscribe(x => console.log(x)) // Oops, sent a command while another one is active...

Here is one possible solution that lacks error handling and looks quite inefficient.

const input = new Rx.Subject()
const output = new Rx.Subject()

input.concatMap(({cmd, id})) => onClient
   .filter(client => client != null)
   .concatMap(client => {
     client.write(cmd + '\r\n')
     return Rx.Observable.fromEvent(client, 'data').take(1)
   })
   .map(value => ({value, id}))
   .subscribe(output)

function sendCommand(cmd) {
  const id = cuid()
  input.onNext(id)
  return output
    .filter(res => res.id === id)
    .map(res => res.value)
}

Any better ideas or suggestions on improvement?

Upvotes: 0

Views: 79

Answers (2)

ronag
ronag

Reputation: 51255

Here is the rather complicated try I ended up with:

import stampit from 'stampit'
import Rx from 'rx'
import cuid from 'cuid'

    let input = new Rx.Subject()
    let output = new Rx.Subject()

    input
      .concatMap(({fn, id}) => Rx.Observable
          .defer(() => fn())
          .map(value => ({value, id}))
          .catch(error => Rx.Observable.return({error, id}))
          .concat(Rx.Observable.return({id})))
      .subscribe(output)

    async function enqueue(fn) {
        const id = cuid()
        input.onNext({fn, id})
        output
          .filter(res => res.id === id)
          .takeWhile(res => res.error || res.value)
          .concatMap(res => res.error
            ? Rx.Observable.throw(res.error)
            : Rx.Observable.return(res.value))
      }
    })

const onClient = new BehaviourSubject(...) // Auto-reconnecting client

function sendCommand(cmd) {
  return enqueue(() => onClient
     .concatMap(client => {
       client.write(cmd + '\r\n')
       return Rx.Observable.fromEvent(client, 'data').take(1)
     }))
}

sendCommand('CMD1').subscribe(x => console.log(x))
sendCommand('CMD2').subscribe(x => console.log(x))

Upvotes: 0

xdhmoore
xdhmoore

Reputation: 9876

Here is my gut instinct. I've only ever used JavaRX, and that just barely. Note that this assumes you want 1 invocation of CMD2 for every return of CMD1.

const onClient = new BehaviourSubject(...) // Auto-reconnecting client

function sendCommand(cmd) {
  return onClient
     .concatMap(client => {
       client.write(cmd + '\r\n')
       return Rx.Observable.fromEvent(client, 'data').take(1)
     })
}

sendCommand('CMD1').subscribe(function(x) {
  console.log(x);
  sendCommand('CMD2').subscribe(y => console.log(y))
});

For what it's worth, you may want to consider using Promises for this stuff. My understanding of Rx is that it is useful for complex streams of async data, such as event streams. But if all you want is the async part, I believe Promises might be easier. We were considering using it on a Java project and decided it wasn't what we needed. See: When to Use Rx

I don't know what you are working on, but a command-response pattern seems to me like it might be better served by Promises, especially if you expect the lambda you're passing into subscribe to only be invoked once.

Upvotes: 1

Related Questions