Eddie K
Eddie K

Reputation: 513

How to combine two sequences cumulatively in RxSwift?

I have two sequences and I'd like to combine them so that any results coming into the second sequence would be cumulatively combined with the latest result from the first sequence.

A---------------B----------------------C------------- ...
-------1-2-----------3-------------------------------- ...

So that the result would be:

A-----A+1--A+1+2---B----B+3--------------C-------------

How might I do that in Rx? (I'm using RxSwift)

Upvotes: 2

Views: 654

Answers (2)

Daniel T.
Daniel T.

Reputation: 33967

Here you go. Hopefully, you can use this as a template on how to write a test to establish what you want and then write the code that produces it.

enum Action<A, B> {
    case a(A)
    case b(B)
}

func example<A, B>(_ a: Observable<A>, _ b: Observable<B>) -> Observable<(A?, [B])> {
    return Observable.merge(a.map(Action.a), b.map(Action.b))
        .scan((A?.none, [B]())) { current, next in
            switch next {
                case .a(let a):
                    return (a, [])
                case .b(let b):
                    return (current.0, current.1 + [b])
            }
        }
}

And here is a test to prove it works:

class RxSandboxTests: XCTestCase {

    func testExample() {
        let scheduler = TestScheduler(initialClock: 0)
        let a = scheduler.createColdObservable([.next(0, "A"), .next(16, "B"), .next(39, "C")])
        let b = scheduler.createColdObservable([.next(7, 1), .next(9, 2), .next(21, 3)])

        let result = scheduler.start {
            example(a.asObservable(), b.asObservable())
                .map { Result(a: $0.0, b: $0.1) }
        }

        XCTAssertEqual(
            result.events,
            [
                .next(200, Result(a: "A", b: [])),
                .next(207, Result(a: "A", b: [1])),
                .next(209, Result(a: "A", b: [1, 2])),
                .next(216, Result(a: "B", b: [])),
                .next(221, Result(a: "B", b: [3])),
                .next(239, Result(a: "C", b: []))
            ]
        )
    }
}

struct Result: Equatable {
    let a: String?
    let b: [Int]
}

Upvotes: 0

Fan Cheung
Fan Cheung

Reputation: 11345

You can use combineLatest + bufferWhen https://stackblitz.com/edit/typescript-s1pemu

import {bufferWhen} from 'rxjs/operators';
import { timer, interval,combineLatest , } from 'rxjs';

// timerOne emits first value at 1s, then once every 4s
const timerOne$ = interval( 4000);
// timerTwo emits first value at 2s, then once every 4s
const timerTwo$ = interval(1000);
// timerThree emits first value at 3s, then once every 4s

// when one timer emits, emit the latest values from each timer as an array
combineLatest(
  timerOne$, 
  timerTwo$.pipe(bufferWhen(()=>timerOne$)), 
)
.subscribe(
  ([timerValOne, timerValTwo]) => {
    console.log(
    `Timer One Latest: ${timerValOne},
     Timer Two Latest: ${timerValTwo}`,
    );
    console.log('Total:', timerValOne+timerValTwo.reduce((acc,curr)=>acc+curr))
  }
);

Upvotes: 1

Related Questions