Douglas Wegscheid
Douglas Wegscheid

Reputation: 73

summarizing Observables in reactivex python

With ReactiveX in Python, how can I summarize a stream of Observables?

I have a stream of dictionaries that are {"user": "...", "date": ...}. I want to make a function I can apply that accumulate the dictionary with the latest date for each user, then emit the accumulated observables when end of stream is hit (it's like max, but has to look at the user field, and will emit multiple values).

Example - input stream:

{ "user": "a", "date": "2017-02-14" }
{ "user": "b", "date": "2016-01-01" }
{ "user": "c", "date": "2015-01-01" }
{ "user": "a", "date": "2017-01-01" }
{ "user": "b", "date": "2017-01-01" }

Expected output (order would not matter)

{ "user": "a", "date": "2017-02-14" }
{ "user": "c", "date": "2015-01-01" }
{ "user": "b", "date": "2017-01-01" }

I read "Filtering Observables", "Transforming Observables", "Combining Observables", and "Decision Tree of Observable Operators" at https://ninmesara.github.io/RxPY/api/operators/index.html, and looked at reduce/aggregate (only emits single value at end), and flat_map (don't know how to detect end of stream). many_select and window (especially) look promising, but I've having a hard time understanding them.

How can I do this with rx (either by using one of the existing operators, or by making a custom operator [which I don't know how to do yet]?)

Upvotes: 1

Views: 564

Answers (2)

Douglas Wegscheid
Douglas Wegscheid

Reputation: 73

Hans' answer is close, just needs a tweak.

My observers expect to get the { 'user': ..., 'date': } dictionaries:

import rx

def pp1(x):
    print type(x), x

rx.Observable.from_([
    { "user": "a", "date": "2017-02-14" },
    { "user": "b", "date": "2016-01-01" },
    { "user": "c", "date": "2015-01-01" },
    { "user": "a", "date": "2017-01-01" },
    { "user": "b", "date": "2017-01-01" }]) \
        .map(lambda x: x[0]) \
        .subscribe(pp1)

yields

<type 'dict'> {'date': '2017-02-14', 'user': 'a'}
<type 'dict'> {'date': '2016-01-01', 'user': 'b'}
<type 'dict'> {'date': '2015-01-01', 'user': 'c'}
<type 'dict'> {'date': '2017-01-01', 'user': 'a'}
<type 'dict'> {'date': '2017-01-01', 'user': 'b'}

Doing the .group_by and .flat_map results in the observers at the end getting lists of length 1 containing the summaries, instead of just the summaries.

import rx

def pp1(x):
    print type(x), x

rx.Observable.from_([
    { "user": "a", "date": "2017-02-14" },
    { "user": "b", "date": "2016-01-01" },
    { "user": "c", "date": "2015-01-01" },
    { "user": "a", "date": "2017-01-01" },
    { "user": "b", "date": "2017-01-01" }]) \
        .group_by(lambda x: x['user']) \
        .flat_map(lambda x: x.max_by(lambda y: y['date'], lambda a,b: -1 if a < b else 1 if a>b else 0)) \
        .subscribe(pp1)

yields

<type 'list'> [{'date': '2017-02-14', 'user': 'a'}]
<type 'list'> [{'date': '2017-01-01', 'user': 'b'}]
<type 'list'> [{'date': '2015-01-01', 'user': 'c'}]

needed to add a map:

import rx

def pp1(x):
    print type(x), x

rx.Observable.from_([
    { "user": "a", "date": "2017-02-14" },
    { "user": "b", "date": "2016-01-01" },
    { "user": "c", "date": "2015-01-01" },
    { "user": "a", "date": "2017-01-01" },
    { "user": "b", "date": "2017-01-01" }]) \
        .group_by(lambda x: x['user']) \
        .flat_map(lambda x: x.max_by(lambda y: y['date'], lambda a,b: -1 if a < b else 1 if a>b else 0)) \
        .map(lambda x: x[0]) \
        .subscribe(pp1)

which yields the expected

<type 'dict'> {'date': '2017-02-14', 'user': 'a'}
<type 'dict'> {'date': '2017-01-01', 'user': 'b'}
<type 'dict'> {'date': '2015-01-01', 'user': 'c'}

Upvotes: 0

Hans Peter Hagblom
Hans Peter Hagblom

Reputation: 88

I think the following might do what you want.

import rx

rx.Observable.from_([
    { "user": "a", "date": "2017-02-14" },
    { "user": "b", "date": "2016-01-01" },
    { "user": "c", "date": "2015-01-01" },
    { "user": "a", "date": "2017-01-01" },
    { "user": "b", "date": "2017-01-01" }]) \
        .group_by(lambda x: x['user']) \
        .flat_map(lambda x: x.max_by(lambda y: y['date'], lambda a,b: -1 if a < b else 1 if a>b else 0)) \
        .subscribe(print)

Upvotes: 1

Related Questions