Reputation: 578
I'm wondering if there is a way to chain array of publishers similar to how we chain publishers with regular flatMap
Let's say I have three publishers: publisher1, publisher2, publisher3 all of them have the same Output, Failure types. For example, each of them is AnyPublisher<String, Never>
and emits a single String value. The only role of each publisher is to fetch its own value and emits previous value joined with its own.
I'm looking for same effect as from the following pseudo code:
let pipe = publisher1(value: "")
.flatMap { publisher2(value: $0) }
.flatMap { publisher3(value: $0) }
Execution flow:
publisher1 (fetches "A") -> publisher2 (fetches "B") -> publisher3 (fetches "C") -> "ABC"
I would like to reproduce the same flow for the array with unknown count of publishers n ([AnyPublisher<String, Never>])
1 -> 2 -> 3 -> ... -> n
I'll appreciate any tips, thanks! :)
Upvotes: 1
Views: 1149
Reputation: 49590
First of all, let's clarify your question. Based on your description of wanting to create a chain of flatMap
-ed publishers, what you must have is an array of closures - not publishers - each returning an AnyPublisher<String, Never>
publisher given a String
parameter.
let pubs: [(String) -> AnyPublisher<String, Never>] = [
publisher1, publisher2, publisher3
]
To chain them, you could use a reduce
method of the array by starting with a Just
publisher to emit the initial parameter:
let pipe = pubs.reduce(Just("").eraseToAnyPublisher()) { acc, next in
acc.flatMap { next($0) }.eraseToAnyPublisher()
}
Upvotes: 4
Reputation: 32904
Another approach is to zip
the publishers together, and them combine the latest values:
let publisher1 = ["A"].publisher
let publisher2 = ["B"].publisher
let publisher3 = ["C"].publisher
_ = publisher1.zip(publisher2, publisher3)
.map { $0+$1+$2 }
.sink(receiveValue: { print("Combined: \($0)") })
/// prints ABC
Or, if you have a variable size number of publishers, you can use MergeMany
and reduce
:
// same result: ABC
_ = Publishers.MergeMany([publisher1, publisher2, publisher3])
.reduce("") { $0 + $1 }
.sink(receiveValue: { print("Combined: \($0)") })
You can go even further, and write your own publisher, if you think you'll be needing this functionality in multiple places:
extension Publishers {
/// works also with arrays, or any other range replaceable collection
struct ConcatenateOutputs<Upstream: Publisher> : Publisher where Upstream.Output: RangeReplaceableCollection {
typealias Output = Upstream.Output
typealias Failure = Upstream.Failure
private let reducer: AnyPublisher<Upstream.Output, Failure>
init(_ publishers: Upstream...) {
self.init(publishers)
}
init<S: Swift.Sequence>(_ publishers: S) where S.Element == Upstream {
reducer = MergeMany(publishers)
.reduce(Output.init()) { $0 + $1 }
.eraseToAnyPublisher()
}
func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
reducer.receive(subscriber: subscriber)
}
}
}
extension Sequence where Element: Publisher, Element.Output: RangeReplaceableCollection {
var concatenateOutputs: Publishers.ConcatenateOutputs<Element> { .init(self) }
}
// same output
_ = Publishers.ConcatenateOutputs([publisher1, publisher2, publisher3])
.sink(receiveValue: { print("Combined: \($0)") })
// the variadic initializer works the same
_ = Publishers.ConcatenateOutputs(publisher1, publisher2, publisher3)
.sink(receiveValue: { print("Combined: \($0)") })
// the less verbose construct
_ = [publisher1, publisher2, publisher3].concatenateOutputs
.sink(receiveValue: { print("Combined: \($0)") })
Upvotes: 0
Reputation: 52053
If I understand you correctly you should be able to use append
on your publishers
let pub1: AnyPublisher<String, Never> = ["A1", "B1", "C1"].publisher.eraseToAnyPublisher()
let pub2: AnyPublisher<String, Never> = ["A2", "B2", "C2"].publisher.eraseToAnyPublisher()
let pub3: AnyPublisher<String, Never> = ["A3", "B3", "C3"].publisher.eraseToAnyPublisher()
_ = pub1.append(pub2.append(pub3))
.sink(receiveValue: { value in
print(value)
})
Upvotes: 2