Reputation: 189
I'm relatively new to the Functional Reactive programming world, and still trying to wrap my head around the concepts. I'm utilizing an SDK to make some network requests - specifically to query a remote database. The SDK returns a publisher, and I have a working pipeline that transforms that result into model objects. Here's that working pipeline:
let existingClaimQuery = "SELECT Id, Subject, CaseNumber FROM Case WHERE Status != 'Closed' ORDER BY CaseNumber DESC"
let requestForOpenCases = RestClient.shared.request(forQuery: existingClaimQuery, apiVersion: RestClient.apiVersion)
caseCancellable = RestClient.shared
.publisher(for: requestForOpenCases)
.receive(on: RunLoop.main)
.tryMap({restresponse -> [String:Any] in
let json = try restresponse.asJson() as? [String:Any]
return json ?? RestClient.JSONKeyValuePairs()
})
.map({json -> [[String:Any]] in
let records = json["records"] as? [[String:Any]]
return records ?? [[:]]
})
.map({
$0.map{(item) -> Claim in
return Claim(
id: item["Id"] as? String ?? "None Listed",
subject: item["Subject"] as? String ?? "None Listed",
caseNumber: item["CaseNumber"] as? String ?? "0"
)
}
})
.mapError{error -> Error in
print(error)
return error
}
.catch{ error in
return Just([])
}
.assign(to: \.claims, on: self)
I went to work on another section of the code, and realized I often need to do this same process - write a query, create a request for that query, and process it through a pipeline that ultimately returns a [[String:Any]].
So here's the million dollar question. What's the right way to encapsulate this pipeline such that I can re-use it without having to copy/pasta the entire pipeline all over the code base? This is my ... attempt at it, but it feels ...wrong?
class QueryStream: ObservableObject {
var query: String = ""
private var queryCancellable: AnyCancellable?
@Published var records: [[String:Any]] = [[String:Any]]()
func execute(){
let queryRequest = RestClient.shared.request(forQuery: query, apiVersion: RestClient.apiVersion)
queryCancellable = RestClient.shared.publisher(for: queryRequest)
.receive(on: RunLoop.main)
.tryMap({restresponse -> [String:Any] in
let json = try restresponse.asJson() as? [String:Any]
return json ?? [String:Any]()
})
.map({json -> [[String:Any]] in
let records = json["records"] as? [[String:Any]]
return records ?? [[:]]
})
.mapError{error -> Error in
print(error)
return error
}
.catch{ error in
return Just([])
}
.assign(to: \.records, on: self)
}
}
This still requires a pipeline to be written for each use. I feel like there should be some way to have a one off promise like pipeline that would allow for
let SomeRecords = QueryStream("Query here").execute()
Am I too n00b? overthinking it? What's the stack's wisdom?
Upvotes: 0
Views: 223
Reputation: 385650
Let me first note that I think you are dispatching to main
to early in your pipeline. As far as I can tell, all of your map
transforms are pure functions (no side effects or references to mutable state), so they can just as well run on the background thread and thus not block the UI.
Second, as Matt said, a Publisher
is generally reusable. Your pipeline builds up a big complex Publisher
, and then subscribes to it, which produces an AnyCancellable
. So factor out the big complex Publisher
but not the subscribing.
You can factor it out into an extension method on your RestClient
for convenience:
extension RestClient {
func records<Record>(
forQuery query: String,
makeRecord: @escaping ([String: Any]) throws -> Record)
-> AnyPublisher<[Record], Never>
{
let request = self.request(forQuery: query, apiVersion: RestClient.apiVersion)
return self.publisher(for: request)
.tryMap { try $0.asJson() as? [String: Any] ?? [:] }
.map { $0["records"] as? [[String: Any]] ?? [] }
.tryMap { try $0.map { try makeRecord($0) } }
.mapError { dump($0) } // dump is a Swift standard function
.replaceError(with: []) // simpler than .catch
.eraseToAnyPublisher()
}
}
Then you can use it like this:
struct Claim {
var id: String
var subject: String
var caseNumber: String
}
extension Claim {
static func from(json: [String: Any]) -> Claim {
return .init(
id: json["Id"] as? String ?? "None Listed",
subject: json["Subject"] as? String ?? "None Listed",
caseNumber: json["CaseNumber"] as? String ?? "0")
}
}
class MyController {
var claims: [Claim] = []
var caseCancellable: AnyCancellable?
func run() {
let existingClaimQuery = "SELECT Id, Subject, CaseNumber FROM Case WHERE Status != 'Closed' ORDER BY CaseNumber DESC"
caseCancellable = RestClient.shared.records(forQuery: existingClaimQuery, makeRecord: Claim.from(json:))
.receive(on: RunLoop.main)
.assign(to: \.claims, on: self)
}
}
Note that I've put the receive(on: RunLoop.main)
operator in the method that subscribes to the publisher, rather than building it in to the publisher. This makes it easy to add additional operators that run on a background scheduler before dispatching to the main thread.
UPDATE
From your comment:
In promise syntax, i could say execute run() as defined above, and .then(doSomethingWithThatData()) knowing that the doSomethingWithThatData wouldn't run until the intial work had completed successfully. I'm trying to develop a setup where I need to use this records(fromQuery:) method runs, and then (and only then) do soemthing with that data. I'm struggling with how to bolt that on to the end.
I don't know what promise implementation you're using, so it's difficult to know what your .then(doSomethingWithThatData())
does. What you've written doesn't really make much sense in Swift. Perhaps you meant:
.then { data in doSomething(with: data) }
In which case, the doSomething(with:)
method cannot possibly be called until the data
is available, because doSomething(with:)
takes the data
as an argument!
Upvotes: 0
Reputation: 535231
Entire pipelines are not reusable. Publishers are reusable. When I say "publisher" I mean an initial publisher plus operators attached to it. (Remember, an operator is itself a publisher.) A publisher can exist as a property of something, so you can subscribe to it, or it can be generated for a particular case (like a particular query request) by a function.
To illustrate, here's a one-off pipeline:
let s = "https://photojournal.jpl.nasa.gov/tiff/PIA23172.tif"
let url = URL(string:s)!
let eph = URLSessionConfiguration.ephemeral
let session = URLSession(configuration: eph)
session.dataTaskPublisher(for: url)
.map {$0.data}
.replaceError(with: Data())
.compactMap { UIImage(data:$0) }
.receive(on: DispatchQueue.main)
.assign(to: \.image, on: self.iv)
.store(in:&self.storage)
That pipeline tries to download the data from a URL, tests to see whether it's image data, and if it is, turns the image data into an image and displays it in an image view in the interface.
Let's say I want to do this for various different remote images. Obviously it would be ridiculous to repeat the whole pipeline everywhere. What differs up front is the URL, so let's encapsulate the first part of the pipeline as a publisher that can be generated on demand based on the URL:
func image(fromURL url:URL) -> AnyPublisher<UIImage,Never> {
let eph = URLSessionConfiguration.ephemeral
let session = URLSession(configuration: eph)
return session.dataTaskPublisher(for: url)
.map {$0.data}
.replaceError(with: Data())
.compactMap { UIImage(data:$0) }
.receive(on: DispatchQueue.main)
.eraseToAnyPublisher()
}
Now the only thing that needs to be repeated in various places in our code is the subscriber to that publisher:
let s = "https://photojournal.jpl.nasa.gov/tiff/PIA23172.tif"
let url = URL(string:s)!
image(fromURL:url)
.map{Optional($0)}
.assign(to: \.image, on: self.iv)
.store(in:&self.storage)
You see? Elsewhere, we might have a different URL, and we might do something different with the UIImage that comes popping out of the call to image(fromURL:)
, and that's just fine; the bulk of the pipeline has been encapsulated and doesn't need to be repeated.
Your example pipeline's publisher is susceptible of that same sort of encapsulation and reuse.
Upvotes: 2