Reputation: 348
How do you take a single event in a reactive extensions stream and split it into multiple events in the same stream?
I have a sequence that retrieves json data which is an array at the top level. At the point where the json data is decoded, I would like to then take each element in that array and continue passing those elements along the stream.
Here's an example with an imaginary function that I wish existed (but with a shorter name!). It's in Python, but I think it's straightforward enough that it should be legible to other Rx programmers.
# Smallest possible example
from rx import Observable
import requests
stream = Observable.just('https://api.github.com/users')
stream.map(requests.get) \
.map(lambda raw: raw.json()) \
.SPLIT_ARRAY_INTO_SEPARATE_EVENTS() \
.subscribe(print)
Put in other words, I want to make a transition like so:
From:
# --[a,b,c]--[d,e]--|->
To:
# --a-b-c-----d-e---|->
Upvotes: 3
Views: 349
Reputation: 348
Bingo. Thanks. SelectMany does it:
from rx import Observable
import requests
stream = Observable.just('https://api.github.com/users')
stream.map(requests.get) \
.map(lambda raw: raw.json()) \
.select_many(lambda x: x) \
.subscribe(print)
Upvotes: 1
Reputation: 106826
You can use the SelectMany
operator:
stream.SelectMany(arr => arr)
This will "flatten" you stream of events just as the C# LINQ SelectMany
operator can be used to flatten a sequence of sequences.
Upvotes: 5