Reputation: 53
I'm trying to figure out how to achieve the following result:
A: -a--b--c-d--e-f-|
B: --1-2-3-|
=: --a-b--c-d--e-f-|
: --1-2--3-1--2-3
where A, B are the input streams and '=' represents the output stream (as a tuple A,B)
Plus the vice-versa:
A: -a-b-|
B: --1--2-3-4--5--6-7-|
=: --a--b-a-b--a--b-a-|
: --1--2-3-4--5--6-7
So in plain text - I'm looking for something that behaves like a zip operator but with ability of 'replaying' the shorter of sequences to match the longer one.
Any idea how to tackle this problem?
Solution provided by @DanielT (with some problems)
extension ObservableType {
public static func zipRepeat<A, B>(_ a: Observable<A>, _ b: Observable<B>) -> Observable<(A, B)> {
return Observable.create { observer in
var aa: [A] = []
var aComplete = false
var bb: [B] = []
var bComplete = false
let lock = NSRecursiveLock()
let disposableA = a.subscribe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case .next(let ae):
aa.append(ae)
if bComplete {
observer.onNext((ae, bb[(aa.count - 1) % bb.count]))
}
else if bb.count == aa.count {
observer.onNext((aa.last!, bb.last!))
}
case .error(let error):
observer.onError(error)
case .completed:
aComplete = true
if bComplete {
observer.onCompleted()
}
}
}
let disposableB = b.subscribe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case .next(let be):
bb.append(be)
if aComplete {
observer.onNext((aa[(bb.count - 1) % aa.count], be))
}
else if bb.count == aa.count {
observer.onNext((aa.last!, bb.last!))
}
case .error(let error):
observer.onError(error)
case .completed:
bComplete = true
if aComplete {
observer.onCompleted()
}
}
}
return Disposables.create(disposableA, disposableB)
}
}
}
My own solution inspired by answers below - own operator (HT @DanielT) but with more imperative approach (HT @iamtimmo):
extension ObservableType {
public static func zipRepeatCollected<A, B>(_ a: Observable<A>, _ b: Observable<B>) -> Observable<(A?, B?)> {
return Observable.create { observer in
var bufferA: [A] = []
let aComplete = PublishSubject<Bool>()
aComplete.onNext(false);
var bufferB: [B] = []
let bComplete = PublishSubject<Bool>()
bComplete.onNext(false);
let disposableA = a.subscribe { event in
switch event {
case .next(let valueA):
bufferA.append(valueA)
case .error(let error):
observer.onError(error)
case .completed:
aComplete.onNext(true)
aComplete.onCompleted()
}
}
let disposableB = b.subscribe { event in
switch event {
case .next(let value):
bufferB.append(value)
case .error(let error):
observer.onError(error)
case .completed:
bComplete.onNext(true)
bComplete.onCompleted()
}
}
let disposableZip = Observable.zip(aComplete, bComplete)
.filter { $0 == $1 && $0 == true }
.subscribe { event in
switch event {
case .next(_, _):
var zippedList = Array<(A?, B?)>()
let lengthA = bufferA.count
let lengthB = bufferB.count
if lengthA > 0 && lengthB > 0 {
for i in 0 ..< max(lengthA, lengthB) {
let aVal = bufferA[i % lengthA]
let bVal = bufferB[i % lengthB]
zippedList.append((aVal, bVal))
}
} else if lengthA == 0 {
zippedList = bufferB.map { (nil, $0) }
} else {
zippedList = bufferA.map { ($0, nil) }
}
zippedList.forEach { observer.onNext($0) }
case .completed:
observer.onCompleted()
case .error(let e):
observer.onError(e)
}
}
return Disposables.create(disposableA, disposableB, disposableZip)
}
}
}
class ZipRepeatTests: XCTestCase {
func testALongerThanB() {
assertAopBEqualsE(
a: "-a--b--c-d--e-f-|",
b: "--1-2-3-|",
e: "a1,b2,c3,d1,e2,f3,|")
}
func testAShorterThanB() {
assertAopBEqualsE(
a: "-a--b|",
b: "--1-2-3-|",
e: "a1,b2,a3,|")
}
func testBStartsLater() {
assertAopBEqualsE(
a: "-a--b|",
b: "----1---2|",
e: "a1,b2,|")
}
func testABWithConstOffset() {
assertAopBEqualsE(
a: "-a--b--c|",
b: "----1--2--3--|",
e: "a1,b2,c3,|")
}
func testAEndsBeforeBStarts() {
assertAopBEqualsE(
a: "ab|",
b: "---1-2-3-4-|",
e: "a1,b2,a3,b4,|")
}
func testACompletesWithoutValue() {
assertAopBEqualsE(
a: "-|",
b: "--1-2-3-|",
e: "1,2,3,|")
}
func testBCompletesWithoutValue() {
assertAopBEqualsE(
a: "-a--b|",
b: "|",
e: "a,b,|")
}
func testNoData() {
assertAopBEqualsE(
a: "-|",
b: "|",
e: "|")
}
func assertAopBEqualsE(_ scheduler: TestScheduler = TestScheduler(initialClock: 0), a: String, b: String, e: String, file: StaticString = #file, line: UInt = #line) {
let aStream = scheduler.createColdObservable(events(a))
let bStream = scheduler.createColdObservable(events(b))
let eStream = expected(e)
let bResults = scheduler.start {
Observable<(String)>.zipRepeatCollected(aStream.asObservable(), bStream.asObservable()).map { "\($0 ?? "")\($1 ?? "")" }
}
XCTAssertEqual(eStream, bResults.events.map { $0.value }, file: file, line: line)
}
func expected(_ stream: String) -> [Event<String>] {
stream.split(separator: ",").map { String($0) == "|" ? .completed : .next(String($0)) }
}
func events(_ stream: String, step: Int = 10) -> [Recorded<Event<String>>] {
var time = 0
var events = [Recorded<Event<String>>]()
stream.forEach { c in
if c == "|" {
events.append(.completed(time))
} else if c != "-" {
events.append(.next(time, String(c)))
}
time += step
}
return events
}
}
Upvotes: 0
Views: 539
Reputation: 63167
There's two orthogonal components at play here:
CycleSequence
type, which wraps an array of elements, and dispenses their elements in order, repeating forever.Conceptually speaking, it's important to realize that the cycling sequence isn't an observable. We need to act in a "pull" manner. When we want the next element in the cycling sequence, we ask it (by invoking its implementation of IteratorProtocol.next()
). This is unlike Observable
s, which have their own chronology, and "push" elements through their chain of operators.
Thus, this is a matter of map
ping each element of the observable with the next()
element from the cycling sequence, like so:
var cyclingIterator = CycleSequence(cycling: ["a", "b", "c"]).makeIterator()
Observable.from(1...100)
.asObservable()
.map { ($0, cyclingIterator.next()) }
.subscribe(onNext: { print($0) })
prints:
(1, Optional("🔵"))
(2, Optional("🔶"))
(3, Optional("🔻"))
(4, Optional("🔵"))
(5, Optional("🔶"))
(6, Optional("🔻"))
(7, Optional("🔵"))
(8, Optional("🔶"))
(9, Optional("🔻"))
(10, Optional("🔵"))
...
Upvotes: 0
Reputation: 33967
When in doubt, you can always make your own operator:
extension ObservableType {
public static func zipRepeat<A, B>(_ a: Observable<A>, _ b: Observable<B>) -> Observable<(A, B)> {
return Observable.create { observer in
var aa: [A] = []
var aComplete = false
var bb: [B] = []
var bComplete = false
let lock = NSRecursiveLock()
let disposableA = a.subscribe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case .next(let ae):
aa.append(ae)
if bComplete {
observer.onNext((ae, bb[(aa.count - 1) % bb.count]))
}
else if bb.count == aa.count {
observer.onNext((aa.last!, bb.last!))
}
case .error(let error):
observer.onError(error)
case .completed:
aComplete = true
if bComplete {
observer.onCompleted()
}
}
}
let disposableB = b.subscribe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case .next(let be):
bb.append(be)
if aComplete {
observer.onNext((aa[(bb.count - 1) % aa.count], be))
}
else if bb.count == aa.count {
observer.onNext((aa.last!, bb.last!))
}
case .error(let error):
observer.onError(error)
case .completed:
bComplete = true
if aComplete {
observer.onCompleted()
}
}
}
return Disposables.create(disposableA, disposableB)
}
}
}
And a test showing the functionality:
class RxSandboxTests: XCTestCase {
func testLongA() {
let scheduler = TestScheduler(initialClock: 0)
let a = scheduler.createColdObservable([.next(10, "a"), .next(20, "b"), .next(30, "c"), .next(40, "d"), .next(50, "e"), .next(60, "f"), .completed(60)])
let b = scheduler.createColdObservable([.next(10, 1), .next(20, 2), .next(30, 3), .completed(30)])
let bResults = scheduler.start {
Observable<(String, Int)>.zipRepeat(a.asObservable(), b.asObservable()).map { $0.1 }
}
XCTAssertEqual(bResults.events, [.next(210, 1), .next(220, 2), .next(230, 3), .next(240, 1), .next(250, 2), .next(260, 3), .completed(260)])
}
func testLongB() {
let scheduler = TestScheduler(initialClock: 0)
let a = scheduler.createColdObservable([.next(10, "a"), .next(20, "b"), .next(30, "c"), .completed(30)])
let b = scheduler.createColdObservable([.next(10, 1), .next(20, 2), .next(30, 3), .next(40, 4), .next(50, 5), .next(60, 6), .completed(60)])
let aResults = scheduler.start {
Observable<(String, Int)>.zipRepeat(a.asObservable(), b.asObservable()).map { $0.0 }
}
XCTAssertEqual(aResults.events, [.next(210, "a"), .next(220, "b"), .next(230, "c"), .next(240, "a"), .next(250, "b"), .next(260, "c"), .completed(260)])
}
}
Upvotes: 1
Reputation: 414
Hi @DegeH,
I don't know RxSwift, but here's something that might help you. It uses the new Combine framework, but you may be able to map to RxSwift. Just paste it into a playground.
In the example, pubA
an pubB
collect the strings from the A
and B
streams, respectively, and emit a list of those strings. Once both of the streams are completed, publisher
combines the two lists (via zip()
) into a list.
The the lists are split back apart and fed into the zipUnalignedPublishersLists
function, which converts them to a single list of tuples. This is where the values from the shorter stream are repeated to align with those of the longer. So, maybe this function will help you the most.
Finally, that list of tuples is flatMap
ed into a sequence publisher, to which subscriber
is subscribed.
The DispatchQueue.main.asyncAfter()
call at the end is just to ensure that execution doesn't end before subscriber
has time to complete in the playground. You don't want it in your application.
import Foundation
import Combine
func zipUnalignedPublishersLists(_ listA: [String], _ listB: [String]) -> [(String, String)] {
var zippedList = Array<(String, String)>()
let lengthA = listA.count
let lengthB = listB.count
for i in 0 ..< max(lengthA, lengthB) {
let aVal = listA[i % lengthA]
let bVal = listB[i % lengthB]
zippedList.append( (aVal, bVal) )
}
return zippedList
}
let ptsA = PassthroughSubject<String, Never>()
let ptsB = PassthroughSubject<String, Never>()
let pubA = ptsA.collect().eraseToAnyPublisher()
let pubB = ptsB.collect().eraseToAnyPublisher()
let publisher = pubA
.zip(pubB)
.map({ listA, listB in
zipUnalignedPublishersLists(listA, listB)
})
.flatMap { $0.publisher }
.eraseToAnyPublisher()
let subscriber = publisher.sink(receiveValue: { print($0) })
ptsA.send("a")
ptsB.send("1")
ptsA.send("b")
ptsB.send("2")
ptsA.send("c")
ptsB.send("3")
ptsB.send(completion: .finished)
ptsA.send("d")
ptsA.send("e")
ptsA.send("f")
ptsA.send(completion: .finished)
DispatchQueue.main.asyncAfter(deadline: .now() + 0.5) {}
Upvotes: 1