DegH
DegH

Reputation: 53

How to zip observables with repeating the shorter sequence

I'm trying to figure out how to achieve the following result:

A: -a--b--c-d--e-f-|
B: --1-2-3-|
=: --a-b--c-d--e-f-|
 : --1-2--3-1--2-3

where A, B are the input streams and '=' represents the output stream (as a tuple A,B)

Plus the vice-versa:

A: -a-b-|
B: --1--2-3-4--5--6-7-|
=: --a--b-a-b--a--b-a-|
 : --1--2-3-4--5--6-7

So in plain text - I'm looking for something that behaves like a zip operator but with ability of 'replaying' the shorter of sequences to match the longer one.

Any idea how to tackle this problem?


Solution 1

Solution provided by @DanielT (with some problems)

extension ObservableType {
    public static func zipRepeat<A, B>(_ a: Observable<A>, _ b: Observable<B>) -> Observable<(A, B)> {
        return Observable.create { observer in
            var aa: [A] = []
            var aComplete = false
            var bb: [B] = []
            var bComplete = false
            let lock = NSRecursiveLock()
            let disposableA = a.subscribe { event in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case .next(let ae):
                    aa.append(ae)
                    if bComplete {
                        observer.onNext((ae, bb[(aa.count - 1) % bb.count]))
                    }
                    else if bb.count == aa.count {
                        observer.onNext((aa.last!, bb.last!))
                    }
                case .error(let error):
                    observer.onError(error)
                case .completed:
                    aComplete = true
                    if bComplete {
                        observer.onCompleted()
                    }
                }
            }
            let disposableB = b.subscribe { event in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case .next(let be):
                    bb.append(be)
                    if aComplete {
                        observer.onNext((aa[(bb.count - 1) % aa.count], be))
                    }
                    else if bb.count == aa.count {
                        observer.onNext((aa.last!, bb.last!))
                    }
                case .error(let error):
                    observer.onError(error)
                case .completed:
                    bComplete = true
                    if aComplete {
                        observer.onCompleted()
                    }
                }
            }
            return Disposables.create(disposableA, disposableB)
        }
    }
}

Solution 2

My own solution inspired by answers below - own operator (HT @DanielT) but with more imperative approach (HT @iamtimmo):

extension ObservableType {
    public static func zipRepeatCollected<A, B>(_ a: Observable<A>, _ b: Observable<B>) -> Observable<(A?, B?)> {
        return Observable.create { observer in

            var bufferA: [A] = []
            let aComplete = PublishSubject<Bool>()
            aComplete.onNext(false);

            var bufferB: [B] = []
            let bComplete = PublishSubject<Bool>()
            bComplete.onNext(false);

            let disposableA = a.subscribe { event in
                switch event {
                case .next(let valueA):
                    bufferA.append(valueA)
                case .error(let error):
                    observer.onError(error)
                case .completed:
                    aComplete.onNext(true)
                    aComplete.onCompleted()
                }
            }

            let disposableB = b.subscribe { event in
                switch event {
                case .next(let value):
                    bufferB.append(value)
                case .error(let error):
                    observer.onError(error)
                case .completed:
                    bComplete.onNext(true)
                    bComplete.onCompleted()
                }
            }

            let disposableZip = Observable.zip(aComplete, bComplete)
                .filter { $0 == $1 && $0 == true }
                .subscribe { event in
                    switch event {
                    case .next(_, _):
                        var zippedList = Array<(A?, B?)>()

                        let lengthA = bufferA.count
                        let lengthB = bufferB.count

                        if lengthA > 0 && lengthB > 0 {
                            for i in 0 ..< max(lengthA, lengthB) {
                                let aVal = bufferA[i % lengthA]
                                let bVal = bufferB[i % lengthB]
                                zippedList.append((aVal, bVal))
                            }
                        } else if lengthA == 0 {
                            zippedList = bufferB.map { (nil, $0) }
                        } else {
                            zippedList = bufferA.map { ($0, nil) }
                        }

                        zippedList.forEach { observer.onNext($0) }
                    case .completed:
                        observer.onCompleted()
                    case .error(let e):
                        observer.onError(e)
                    }
            }

            return Disposables.create(disposableA, disposableB, disposableZip)
        }
    }
}

class ZipRepeatTests: XCTestCase {
    func testALongerThanB() {
        assertAopBEqualsE(
            a: "-a--b--c-d--e-f-|",
            b: "--1-2-3-|",
            e: "a1,b2,c3,d1,e2,f3,|")
    }

    func testAShorterThanB() {
        assertAopBEqualsE(
            a: "-a--b|",
            b: "--1-2-3-|",
            e: "a1,b2,a3,|")
    }
    func testBStartsLater() {
        assertAopBEqualsE(
            a: "-a--b|",
            b: "----1---2|",
            e: "a1,b2,|")

    }
    func testABWithConstOffset() {
        assertAopBEqualsE(
            a: "-a--b--c|",
            b: "----1--2--3--|",
            e: "a1,b2,c3,|")
    }

    func testAEndsBeforeBStarts() {
        assertAopBEqualsE(
            a: "ab|",
            b: "---1-2-3-4-|",
            e: "a1,b2,a3,b4,|")
    }

    func testACompletesWithoutValue() {
        assertAopBEqualsE(
            a: "-|",
            b: "--1-2-3-|",
            e: "1,2,3,|")
    }
    func testBCompletesWithoutValue() {
        assertAopBEqualsE(
            a: "-a--b|",
            b: "|",
            e: "a,b,|")
    }
    func testNoData() {
        assertAopBEqualsE(
            a: "-|",
            b: "|",
            e: "|")
    }

    func assertAopBEqualsE(_ scheduler: TestScheduler = TestScheduler(initialClock: 0), a: String, b: String, e: String, file: StaticString = #file, line: UInt = #line) {

        let aStream = scheduler.createColdObservable(events(a))
        let bStream = scheduler.createColdObservable(events(b))
        let eStream = expected(e)

        let bResults = scheduler.start {
            Observable<(String)>.zipRepeatCollected(aStream.asObservable(), bStream.asObservable()).map { "\($0 ?? "")\($1 ?? "")" }
        }
        XCTAssertEqual(eStream, bResults.events.map { $0.value }, file: file, line: line)
    }
    func expected(_ stream: String) -> [Event<String>] {
        stream.split(separator: ",").map { String($0) == "|" ? .completed : .next(String($0)) }
    }
    func events(_ stream: String, step: Int = 10) -> [Recorded<Event<String>>] {
        var time = 0
        var events = [Recorded<Event<String>>]()
        stream.forEach { c in
            if c == "|" {
                events.append(.completed(time))
            } else if c != "-" {
                events.append(.next(time, String(c)))
            }
            time += step
        }
        return events
    }
}

Upvotes: 0

Views: 539

Answers (3)

Alexander
Alexander

Reputation: 63167

There's two orthogonal components at play here:

  1. Given a finite list of elements, convert them into an finite sequence of cycling elements. This comes up pretty often, so I've implemented a CycleSequence type, which wraps an array of elements, and dispenses their elements in order, repeating forever.
  2. A way of combining that infinitely cycled sequence with an observable of streaming elements, pairing each new element with the appropriate member from the cycling sequence.

Conceptually speaking, it's important to realize that the cycling sequence isn't an observable. We need to act in a "pull" manner. When we want the next element in the cycling sequence, we ask it (by invoking its implementation of IteratorProtocol.next()). This is unlike Observables, which have their own chronology, and "push" elements through their chain of operators.

Thus, this is a matter of mapping each element of the observable with the next() element from the cycling sequence, like so:

var cyclingIterator = CycleSequence(cycling: ["a", "b", "c"]).makeIterator()

Observable.from(1...100)
    .asObservable()
    .map { ($0, cyclingIterator.next()) }
    .subscribe(onNext: { print($0) })

prints:

(1, Optional("🔵"))
(2, Optional("🔶"))
(3, Optional("🔻"))
(4, Optional("🔵"))
(5, Optional("🔶"))
(6, Optional("🔻"))
(7, Optional("🔵"))
(8, Optional("🔶"))
(9, Optional("🔻"))
(10, Optional("🔵"))
...

Upvotes: 0

Daniel T.
Daniel T.

Reputation: 33967

When in doubt, you can always make your own operator:

extension ObservableType {
    public static func zipRepeat<A, B>(_ a: Observable<A>, _ b: Observable<B>) -> Observable<(A, B)> {
        return Observable.create { observer in
            var aa: [A] = []
            var aComplete = false
            var bb: [B] = []
            var bComplete = false
            let lock = NSRecursiveLock()
            let disposableA = a.subscribe { event in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case .next(let ae):
                    aa.append(ae)
                    if bComplete {
                        observer.onNext((ae, bb[(aa.count - 1) % bb.count]))
                    }
                    else if bb.count == aa.count {
                        observer.onNext((aa.last!, bb.last!))
                    }
                case .error(let error):
                    observer.onError(error)
                case .completed:
                    aComplete = true
                    if bComplete {
                        observer.onCompleted()
                    }
                }
            }
            let disposableB = b.subscribe { event in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case .next(let be):
                    bb.append(be)
                    if aComplete {
                        observer.onNext((aa[(bb.count - 1) % aa.count], be))
                    }
                    else if bb.count == aa.count {
                        observer.onNext((aa.last!, bb.last!))
                    }
                case .error(let error):
                    observer.onError(error)
                case .completed:
                    bComplete = true
                    if aComplete {
                        observer.onCompleted()
                    }
                }
            }
            return Disposables.create(disposableA, disposableB)
        }
    }
}

And a test showing the functionality:

class RxSandboxTests: XCTestCase {

    func testLongA() {
        let scheduler = TestScheduler(initialClock: 0)
        let a = scheduler.createColdObservable([.next(10, "a"), .next(20, "b"), .next(30, "c"), .next(40, "d"), .next(50, "e"), .next(60, "f"), .completed(60)])
        let b = scheduler.createColdObservable([.next(10, 1), .next(20, 2), .next(30, 3), .completed(30)])

        let bResults = scheduler.start {
            Observable<(String, Int)>.zipRepeat(a.asObservable(), b.asObservable()).map { $0.1 }
        }

        XCTAssertEqual(bResults.events, [.next(210, 1), .next(220, 2), .next(230, 3), .next(240, 1), .next(250, 2), .next(260, 3), .completed(260)])
    }

    func testLongB() {
        let scheduler = TestScheduler(initialClock: 0)
        let a = scheduler.createColdObservable([.next(10, "a"), .next(20, "b"), .next(30, "c"), .completed(30)])
        let b = scheduler.createColdObservable([.next(10, 1), .next(20, 2), .next(30, 3), .next(40, 4), .next(50, 5), .next(60, 6), .completed(60)])

        let aResults = scheduler.start {
            Observable<(String, Int)>.zipRepeat(a.asObservable(), b.asObservable()).map { $0.0 }
        }

        XCTAssertEqual(aResults.events, [.next(210, "a"), .next(220, "b"), .next(230, "c"), .next(240, "a"), .next(250, "b"), .next(260, "c"), .completed(260)])
    }
}

Upvotes: 1

iamtimmo
iamtimmo

Reputation: 414

  • Swift 5.1 / Combine *

Hi @DegeH,

I don't know RxSwift, but here's something that might help you. It uses the new Combine framework, but you may be able to map to RxSwift. Just paste it into a playground.

In the example, pubA an pubB collect the strings from the A and B streams, respectively, and emit a list of those strings. Once both of the streams are completed, publisher combines the two lists (via zip()) into a list.

The the lists are split back apart and fed into the zipUnalignedPublishersLists function, which converts them to a single list of tuples. This is where the values from the shorter stream are repeated to align with those of the longer. So, maybe this function will help you the most.

Finally, that list of tuples is flatMaped into a sequence publisher, to which subscriber is subscribed.

The DispatchQueue.main.asyncAfter() call at the end is just to ensure that execution doesn't end before subscriber has time to complete in the playground. You don't want it in your application.

import Foundation
import Combine

func zipUnalignedPublishersLists(_ listA: [String], _ listB: [String]) -> [(String, String)] {
    var zippedList = Array<(String, String)>()

    let lengthA = listA.count
    let lengthB = listB.count
    for i in 0 ..< max(lengthA, lengthB) {
        let aVal = listA[i % lengthA]
        let bVal = listB[i % lengthB]
        zippedList.append( (aVal, bVal) )
    }

    return zippedList
}

let ptsA = PassthroughSubject<String, Never>()
let ptsB = PassthroughSubject<String, Never>()

let pubA = ptsA.collect().eraseToAnyPublisher()
let pubB = ptsB.collect().eraseToAnyPublisher()

let publisher = pubA
    .zip(pubB)
    .map({ listA, listB in
        zipUnalignedPublishersLists(listA, listB)
    })
    .flatMap { $0.publisher }
    .eraseToAnyPublisher()

let subscriber = publisher.sink(receiveValue: { print($0) })

ptsA.send("a")
ptsB.send("1")
ptsA.send("b")
ptsB.send("2")
ptsA.send("c")
ptsB.send("3")
ptsB.send(completion: .finished)
ptsA.send("d")
ptsA.send("e")
ptsA.send("f")
ptsA.send(completion: .finished)

DispatchQueue.main.asyncAfter(deadline: .now() + 0.5) {}

Upvotes: 1

Related Questions