Reputation: 2398
I am currently trying to implement the merging of two publishers. But I can't find a solution for my use case.
I want to merge 2 publishers that both emit an array of structs of the same type. I want the combined publisher to emit values when either one of the merged publishers emit a new value.
Basically this would be a use case for Publishers.CombineLatest
, but since my underlying publishers both emit values of the same type a merge
would be more fitting here. But Publishers.Merge
will not remember the last values of the merged publishers.
Therefore I would like to have a Publishers.CombineLatest
behaviour with a Publishers.Merge
operation. Is there something inside the Combine framework which can accomplish this kind of behaviour ?
Rough example what should happen:
Definitions:
PublisherA: emits -> [Value]
PublisherB emits -> [Value]
CombinedAB: -> [Value]
PublisherA changes: CombinedAB -> [NewA, OldB]
PublisherB changes: CombinedAB -> [OldA, NewB]
let a = CurrentValueSubject<[Int], Never>(["a", "b", "c"])
let b = CurrentValueSubject<[Int], Never>(["d", "e", "f"])
let combined = Publisher.AnyThing(a, b)
combined.sink {
print($0)
}
b.send(["g", "h", "i"])
Outputs:
["a", "b", "c", "d", "e", "f"]
["a", "b", "c", "g", "h", "i"]
So it's basically a Publishers.CombineLatest
but without emitting a tuple of (NewA,OldB) but instead already merged, because both values have the same type.
Any help is much appreciated.
Upvotes: 7
Views: 6586
Reputation: 9935
The docs explain the differences between three approaches for combining Publishers:
Use
combineLatest(_:)
when you want the downstream subscriber to receive a tuple of the most-recent element from multiple publishers when any of them emit a value. To pair elements from multiple publishers, usezip(_:)
instead. To receive just the most-recent element from multiple publishers rather than tuples, usemerge(with:)
.
Zip
and CombineLatest
are the only appropriate solution for consuming events together in real-time. In your particular example (ordering of sending events) CombineLatest
is the best solution, as explained below.
CombineLatest
You are right that CombineLatest
does listen to events from nested Publishers, however prints Publisher
elements using a tuple. This can be easily fixed using map
, or compactMap
depending on the Array generic parameter. CombineLatest
publishes only the latest unconsumed events once both Publishers have published an element. This means that once both publishers have published an event, then all subsequent events will be published.
let a = CurrentValueSubject<[String], Never>(["a", "b", "c"])
let b = CurrentValueSubject<[String], Never>(["d", "e", "f"])
a
.combineLatest(b, +)
.sink { print("\($0)") }
.store(in: &cancellableSet)
b.send(["g", "h", "i"])
// ["a", "b", "c", "d", "e", "f"]
// ["a", "b", "c", "g", "h", "i"]
Merge
Publishers.Merge
may seem more appropriate because of the same Output
Generic type, however it isn't. Merge
only receives the latest published element for an individual Publisher
. So, we can't print the combined stream even though we've "merged" the Publishers. The documentation refers to Merge
as creating an interleaved stream rather than a combined Publisher.
let a = CurrentValueSubject<[String], Never>(["a", "b", "c"])
let b = CurrentValueSubject<[String], Never>(["d", "e", "f"])
let combined = Publishers.Merge(a, b)
combined.sink {
print($0)
}
.store(in: &cancellableSet)
b.send(["g", "h", "i"])
// ["a", "b", "c"]
// ["d", "e", "f"]
// ["g", "h", "i"]
Zip
Zip
is a viable alternative to CombineLatest
and both are valid options for printing combined Publisher events. The difference is that Zip
publishes the oldest unconsumed event when waiting for the other Publisher.
let a = CurrentValueSubject<[String], Never>(["a", "b", "c"])
let b = CurrentValueSubject<[String], Never>(["d", "e", "f"])
a
.zip(b).map(+)
.sink { print("\($0)") }
.store(in: &cancellableSet)
b.send(["g", "h", "i"])
// ["a", "b", "c", "d", "e", "f"]
// To print "g","h","i", we need `a` to send an event.
a.send(["a", "b", "c"])
// ["a", "b", "c", "g", "h", "i"]
Use combineLatest
for printing combined publisher events. Use zip
for printing combined publisher events when both publishers have to be in sync. Use merge
when you want to listen to both Publisher
events individually. Merge
works well for creating a single interleaved event stream. If either upstream publisher finishes successfully or fails with an error, the zipped/combined/merged publisher does the same. Carefully consider which operator to use for your particular application NOT only by type signature but by actual behaviour instead.
If you want to combine more than 4 Publishers
(why?), you can actually use nested Publishers.CombineLatest4
with another Publishers.CombineLatest
. SwiftUI uses this technique to combine more than 10 SwiftUI View
s in a single ViewBuilder
.
let a = CurrentValueSubject<[String], Never>(["a", "b", "c"])
let b = CurrentValueSubject<[String], Never>(["d", "e", "f"])
let c = CurrentValueSubject<[String], Never>(["g", "h", "i"])
let d = CurrentValueSubject<[String], Never>(["j", "k", "l"])
let e = CurrentValueSubject<[String], Never>(["a", "b", "c"])
let f = CurrentValueSubject<[String], Never>(["d", "e", "f"])
let g = CurrentValueSubject<[String], Never>(["g", "h", "i"])
let h = CurrentValueSubject<[String], Never>(["j", "k", "l"])
let combinedOne = Publishers.CombineLatest4(a, b, c, d)
let combinedTwo = Publishers.CombineLatest4(e, f, g, h)
let combined = Publishers.CombineLatest(combinedOne, combinedTwo)
Upvotes: 2
Reputation: 2916
Assuming that your combine operation is just concat of the subarrays you can do:
let a = CurrentValueSubject<[String], Never>(["a", "b", "c"])
let b = CurrentValueSubject<[String], Never>(["d", "e", "f"])
let combined = Publishers.CombineLatest(a, b).map(+)
combined.sink {
print($0) //["a", "b", "c", "d", "e", "f"] and ["a", "b", "c", "g", "h", "i"]
}
b.send(["g", "h", "i"])
I am not completely sure what you mean with "already merged".
If you want to have the latest emitted array always at the end of the combined array then you might need a scan
operator before the map(+)
to be able to compare with previous emissions and swap them.
Upvotes: 4