Reputation: 1139
I have an object called ProcessorStack
which contains zero or more child Processor
items. The ProcessorStack
and the individual Processor
objects each have only a single method:
process(input: Value) -> Future<Value, Never>
I would like the return value to be a Future
rather than AnyPublisher
to clearly indicate that the caller should only expect a single result to be emitted. Other objects only have access to the ProcessorStack
, not its Processor
children. Here's what I want to happen:
ProcessorStack
:
stack.process(value: someValue).sink { result in
// Do something with the result
}.store(in: &subscriptions)
ProcessorStack
chains all of its child Processor
objects together using a reduce operation and returns the final result via the Future
:
func process(value: Value) -> Future<Value, Never> {
guard !childProcessors.isEmpty else {
return Future { $0(.success(value)) }
}
let just = Just(value).eraseToAnyPublisher()
childProcessors.reduce(just) { (publisher, processor) -> AnyPublisher<Value, Never> in
publisher.flatMap { processor.process(value: $0).eraseToAnyPublisher() }
}
// Here's where I'm lost.
}
I cannot for the life of me figure out how to execute the asynchronous reduce chain and then return the result as a Future
. If I wrap the whole reduce operation in a Future
initializer, I'm left holding an AnyPublisher<Value, Never>
which I somehow have to get to execute and then the result of that get passed to the Future
's completion closure. I can't sink
it inside the Future
's closure, because I have to hold on to the cancellable returned from that or the whole process immediately halts. I can't FlatMap the result into a Future, because that has the type FlatMap<AnyPublisher<Value, Never>, Future<Value, Never>>
. I can accomplish all of this if I just make the outer return type AnyPublisher<Value, Never>
, but I was really hoping to have the Future
semantics for subscribers.
Upvotes: 0
Views: 989
Reputation: 49590
The premise of what you're trying to do is wrong. Future
, though it can only return at most a single result, doesn't semantically stand for that. It's just a specific type of a publisher.
You should be returning an AnyPublisher
at the function boundary, and not try to avoid it, which would make your code more robust to changes (e.g. what if you needed to wrap the Future
in a Deferred
? - a common practice)
process(value: Value) -> AnyPublisher<Value, Never> {
...
}
And if a subscriber can only handle a single result, they could simply ensure that with first()
:
process(value)
.first()
.sink {...}
.store(in: &storage)
But if you insist, you could use a .sink
inside Future
's closure, if the closure captured the reference to the AnyCancellable
and released it on completion:
process(value: Value) -> Future<Value, Never> {
guard !childProcessors.isEmpty else {
return Future { $0(.success(value)) }
}
let just = Just(value).eraseToAnyPublisher()
let combined = childProcessors.reduce(just) { (publisher, processor) -> AnyPublisher<Value, Never> in
publisher.flatMap { processor.process(value: $0).eraseToAnyPublisher() }
}
var c: AnyCancellable? = nil
return Future { promise in
c = combined.sink(receiveCompletion: {
withExtendedLifetime(c){}; c = nil
}) {
promise(.success($0))
}
}
}
Upvotes: 2