Fábio Beranizo
Fábio Beranizo

Reputation: 31

RxPY - flat_map emissions waits for next generator value

Hi! I'm trying to get my first RxPY project done, but I'm having some problems
undestanding the behavior of flat_map in Python.

In this project there is an Observable created from a generator (a Kafka consumer). It emits values when a message is received, then performs a query based on the message, and emits a value for each result.

I made some changes to the code in order to make it easier to reproduce. Kafka consumer was replaced by a generator that takes a lot of time between emissions, and query results were replaced by an Observable that emits 3 values. Behavior is still the same.

from rx import Observable

generator = (i for i in range(100000000) if i == 0 or i == 50000000)
Observable.from_(generator) \
    .flat_map(lambda i: Observable.from_(['a', 'b', 'c'])) \
    .subscribe(on_next=lambda i: print(i))

Output:

a
(...waits a long time...)
b
a
(...waits a long time...)
c
b
c

I was expecting something like this:

a
b
c
(...waits a long time...)
a
b
c

What is the reason for this behavior? What should I do to get the expected result?

Thank you! :)

Upvotes: 3

Views: 1071

Answers (1)

Dmytro Severnyuk
Dmytro Severnyuk

Reputation: 1

Recentlty came across same issue with flat_map operator and ImmediateScheduler helped here.

Initial code updated a little bit for RxPy 3:

import rx
from rx.operators import flat_map


generator = (i for i in range(100000000) if i == 0 or i == 50000000)
rx.from_(generator).pipe(
    flat_map(
        lambda i: rx.from_(['a', 'b', 'c'])
    )
).subscribe(on_next=lambda i: print(i))

Output differs a little bit but issue is the same:

(... waits a long time ...)
a
b
c
a
b
c

Applied ImmediateScheduler for the observable inside flat_map:

import rx
from rx.operators import flat_map
from rx.scheduler import ImmediateScheduler


generator = (i for i in range(100000000) if i == 0 or i == 50000000)
rx.from_(generator).pipe(
    flat_map(
        lambda i: rx.from_(['a', 'b', 'c'], scheduler=ImmediateScheduler())
    )
).subscribe(on_next=lambda i: print(i))

And got the expected result:

a
b
c
(...waits a long time...)
a
b
c

Upvotes: 0

Related Questions