rharder
rharder

Reputation: 348

How to split one event into many with reactive extensions?

How do you take a single event in a reactive extensions stream and split it into multiple events in the same stream?

I have a sequence that retrieves json data which is an array at the top level. At the point where the json data is decoded, I would like to then take each element in that array and continue passing those elements along the stream.

Here's an example with an imaginary function that I wish existed (but with a shorter name!). It's in Python, but I think it's straightforward enough that it should be legible to other Rx programmers.

# Smallest possible example
from rx import Observable
import requests
stream = Observable.just('https://api.github.com/users')
stream.map(requests.get) \
      .map(lambda raw: raw.json()) \
      .SPLIT_ARRAY_INTO_SEPARATE_EVENTS() \
      .subscribe(print)

Put in other words, I want to make a transition like so:

From:
# --[a,b,c]--[d,e]--|->
To:
# --a-b-c-----d-e---|->

Upvotes: 3

Views: 349

Answers (2)

rharder
rharder

Reputation: 348

Bingo. Thanks. SelectMany does it:

from rx import Observable
import requests
stream = Observable.just('https://api.github.com/users')
stream.map(requests.get) \
    .map(lambda raw: raw.json()) \
    .select_many(lambda x: x) \
    .subscribe(print)

Upvotes: 1

Martin Liversage
Martin Liversage

Reputation: 106826

You can use the SelectMany operator:

stream.SelectMany(arr => arr)

This will "flatten" you stream of events just as the C# LINQ SelectMany operator can be used to flatten a sequence of sequences.

Upvotes: 5

Related Questions