grahan
grahan

Reputation: 2398

Combine Publishers.Merge but with Publishers.combineLatest behaviour

I am currently trying to implement the merging of two publishers. But I can't find a solution for my use case.

I want to merge 2 publishers that both emit an array of structs of the same type. I want the combined publisher to emit values when either one of the merged publishers emit a new value.

Basically this would be a use case for Publishers.CombineLatest, but since my underlying publishers both emit values of the same type a merge would be more fitting here. But Publishers.Merge will not remember the last values of the merged publishers.

Therefore I would like to have a Publishers.CombineLatest behaviour with a Publishers.Merge operation. Is there something inside the Combine framework which can accomplish this kind of behaviour ?

Rough example what should happen:

Definitions:

PublisherA: emits -> [Value]
PublisherB emits -> [Value]

CombinedAB: -> [Value]


PublisherA changes: CombinedAB -> [NewA, OldB]
PublisherB changes: CombinedAB -> [OldA, NewB]

let a = CurrentValueSubject<[Int], Never>(["a", "b", "c"])
let b = CurrentValueSubject<[Int], Never>(["d", "e", "f"])

let combined = Publisher.AnyThing(a, b)

combined.sink {
   print($0)
}


b.send(["g", "h", "i"])


Outputs:
["a", "b", "c", "d", "e", "f"]
["a", "b", "c", "g", "h", "i"]

So it's basically a Publishers.CombineLatest but without emitting a tuple of (NewA,OldB) but instead already merged, because both values have the same type.

Any help is much appreciated.

Upvotes: 7

Views: 6586

Answers (2)

Pranav Kasetti
Pranav Kasetti

Reputation: 9935

Problem Overview

The docs explain the differences between three approaches for combining Publishers:

Use combineLatest(_:) when you want the downstream subscriber to receive a tuple of the most-recent element from multiple publishers when any of them emit a value. To pair elements from multiple publishers, use zip(_:) instead. To receive just the most-recent element from multiple publishers rather than tuples, use merge(with:).

Zip and CombineLatest are the only appropriate solution for consuming events together in real-time. In your particular example (ordering of sending events) CombineLatest is the best solution, as explained below.

CombineLatest

You are right that CombineLatest does listen to events from nested Publishers, however prints Publisher elements using a tuple. This can be easily fixed using map, or compactMap depending on the Array generic parameter. CombineLatest publishes only the latest unconsumed events once both Publishers have published an element. This means that once both publishers have published an event, then all subsequent events will be published.

Listening Usage Demo

let a = CurrentValueSubject<[String], Never>(["a", "b", "c"])
let b = CurrentValueSubject<[String], Never>(["d", "e", "f"])
a 
  .combineLatest(b, +)
  .sink { print("\($0)") }
  .store(in: &cancellableSet)
b.send(["g", "h", "i"])

// ["a", "b", "c", "d", "e", "f"]
// ["a", "b", "c", "g", "h", "i"]

Merge

Publishers.Merge may seem more appropriate because of the same Output Generic type, however it isn't. Merge only receives the latest published element for an individual Publisher. So, we can't print the combined stream even though we've "merged" the Publishers. The documentation refers to Merge as creating an interleaved stream rather than a combined Publisher.

Merge Listening Usage Demo:

let a = CurrentValueSubject<[String], Never>(["a", "b", "c"])
let b = CurrentValueSubject<[String], Never>(["d", "e", "f"])

let combined = Publishers.Merge(a, b)

combined.sink {
  print($0)
}
.store(in: &cancellableSet)

b.send(["g", "h", "i"])

// ["a", "b", "c"]
// ["d", "e", "f"]
// ["g", "h", "i"]

Zip

Zip is a viable alternative to CombineLatest and both are valid options for printing combined Publisher events. The difference is that Zip publishes the oldest unconsumed event when waiting for the other Publisher.

Listening Usage Demo

let a = CurrentValueSubject<[String], Never>(["a", "b", "c"])
let b = CurrentValueSubject<[String], Never>(["d", "e", "f"])
a 
  .zip(b).map(+)
  .sink { print("\($0)") }
  .store(in: &cancellableSet)
b.send(["g", "h", "i"])

// ["a", "b", "c", "d", "e", "f"]

// To print "g","h","i", we need `a` to send an event.

a.send(["a", "b", "c"])
// ["a", "b", "c", "g", "h", "i"]

Summary

Use combineLatest for printing combined publisher events. Use zip for printing combined publisher events when both publishers have to be in sync. Use merge when you want to listen to both Publisher events individually. Merge works well for creating a single interleaved event stream. If either upstream publisher finishes successfully or fails with an error, the zipped/combined/merged publisher does the same. Carefully consider which operator to use for your particular application NOT only by type signature but by actual behaviour instead.

Bonus Notes

If you want to combine more than 4 Publishers (why?), you can actually use nested Publishers.CombineLatest4 with another Publishers.CombineLatest. SwiftUI uses this technique to combine more than 10 SwiftUI Views in a single ViewBuilder.

Code Example

let a = CurrentValueSubject<[String], Never>(["a", "b", "c"])
let b = CurrentValueSubject<[String], Never>(["d", "e", "f"])
let c = CurrentValueSubject<[String], Never>(["g", "h", "i"])
let d = CurrentValueSubject<[String], Never>(["j", "k", "l"])

let e = CurrentValueSubject<[String], Never>(["a", "b", "c"])
let f = CurrentValueSubject<[String], Never>(["d", "e", "f"])
let g = CurrentValueSubject<[String], Never>(["g", "h", "i"])
let h = CurrentValueSubject<[String], Never>(["j", "k", "l"])

let combinedOne = Publishers.CombineLatest4(a, b, c, d)
let combinedTwo = Publishers.CombineLatest4(e, f, g, h)
let combined = Publishers.CombineLatest(combinedOne, combinedTwo)

Upvotes: 2

Fabio Felici
Fabio Felici

Reputation: 2916

Assuming that your combine operation is just concat of the subarrays you can do:

let a = CurrentValueSubject<[String], Never>(["a", "b", "c"])
let b = CurrentValueSubject<[String], Never>(["d", "e", "f"])

let combined = Publishers.CombineLatest(a, b).map(+)

combined.sink {
   print($0)     //["a", "b", "c", "d", "e", "f"] and ["a", "b", "c", "g", "h", "i"]
}


b.send(["g", "h", "i"])

I am not completely sure what you mean with "already merged". If you want to have the latest emitted array always at the end of the combined array then you might need a scan operator before the map(+) to be able to compare with previous emissions and swap them.

Upvotes: 4

Related Questions