bigpotato
bigpotato

Reputation: 27527

Redux Observable / RxJS: How to create custom observable?

I'm trying to do websocket setup in an redux-observable epic, and i'm going with an approach similar to this guy: https://github.com/MichalZalecki/connect-rxjs-to-react/issues/1

However, it looks like my first stab at wiring things up isn't working, even though it looks the same as the guy above:

import 'rxjs';
import Observable from 'rxjs';

import * as scheduleActions from '../ducks/schedule';

export default function connectSocket(action$, store) {
  return action$.ofType(scheduleActions.CANCEL_RSVP)
    .map(action => {
      new Observable(observer => {
        // do websocket stuff here
        observer.next('message text');
      });
    })
    .map(text => {
      console.log("xxxxxxxxxxxxx: ", text);
      return scheduleActions.rsvpCancelled(1);
    });
};

However, I'm getting a Object is not a constructor error:

enter image description here

=== UPDATE ===

Looks like the suggestion to destructure the { Observable } export worked!

Not the only issue is that text doesn't seem to cross over to the next method...

import 'rxjs';
import { Observable } from 'rxjs';

import * as scheduleActions from '../ducks/schedule';

export default function connectSocket(action$, store) {
  return action$.ofType(scheduleActions.CANCEL_RSVP)
    .map(action => {
      new Observable(observer => {
        // do websocket stuff here
        observer.next('message text');
      });
    })
    .map(text => {
      console.log("xxxxxxxxxxxxx: ", text); // prints undefined
      return scheduleActions.rsvpCancelled(1);
    });
};

Upvotes: 0

Views: 1853

Answers (2)

Ambuj Khanna
Ambuj Khanna

Reputation: 1219

You can take a look at Demo. Visit at Create Custom Observable

Upvotes: 0

jayphelps
jayphelps

Reputation: 15401

In RxJS v5, the Observable class is available as named export, not the default export.

import { Observable } from 'rxjs';

Importing from regular rxjs will also import all of RxJS (adding all operators to the Observable prototype). This is described in the docs here. If you'd prefer to be more explicit and only import Observable itself you can import it directly at rxjs/Observable:

import { Observable } from 'rxjs/Observable';

Separately, you have a couple issues with the way you're mapping your custom Observable.

First Issue

You're not actually returning it. hehe. You're missing a return statement (or you can remove the curly braces and use arrow function implicit returns).

Second Issue

The regular .map() operator does not do anything special when you return an Observable. If you want the custom Observable to be subscribed to and flattened you'll need to use an operator that does flattening of some kind.

The most common two are mergeMap (aka flatMap) or switchMap.

action$.ofType(scheduleActions.CANCEL_RSVP)
  .mergeMap(action => {
    return new Observable(observer => {
      // do websocket stuff here
      observer.next('message text');
    });
  })

Which operator you need depends on your desired behavior. If you're not yet familiar, you can check out the documentation on the various operators or jump straight to the mergeMap and switchMap docs.


If you're adventurous, RxJS v5 does have WebSocket support out of box you can try with Observable.webSocket(). It's not documented very well, but you could also take a look at the unit tests, and for simple read-only unidirectional streaming it's pretty self explanatory--provide the URL and subscribe. It's actually incredibly powerful, if you can figure out how to use it, that is. Supports bi-directional, multiplex aka complex multiple input/output channels through a single socket. We use it at Netflix for several internal tools with thousands of rps.

Upvotes: 3

Related Questions