Kevin P
Kevin P

Reputation: 189

What's the proper method for creating re-usable pipelines with Combine?

I'm relatively new to the Functional Reactive programming world, and still trying to wrap my head around the concepts. I'm utilizing an SDK to make some network requests - specifically to query a remote database. The SDK returns a publisher, and I have a working pipeline that transforms that result into model objects. Here's that working pipeline:

let existingClaimQuery = "SELECT Id, Subject, CaseNumber FROM Case WHERE Status != 'Closed' ORDER BY CaseNumber DESC"
let requestForOpenCases = RestClient.shared.request(forQuery: existingClaimQuery, apiVersion: RestClient.apiVersion)
caseCancellable = RestClient.shared
  .publisher(for: requestForOpenCases)
  .receive(on: RunLoop.main)
  .tryMap({restresponse -> [String:Any] in
    let json = try restresponse.asJson() as? [String:Any]
    return json ?? RestClient.JSONKeyValuePairs()
  })
  .map({json -> [[String:Any]] in
    let records = json["records"] as? [[String:Any]]
    return records ?? [[:]]
  })
  .map({
    $0.map{(item) -> Claim in
      return Claim(
        id: item["Id"] as? String ?? "None Listed",
        subject: item["Subject"] as? String ?? "None Listed",
        caseNumber: item["CaseNumber"] as? String ?? "0"
      )
    }
  })
  .mapError{error -> Error in
    print(error)
    return error
  }
  .catch{ error in
    return Just([])
  }
.assign(to: \.claims, on: self)

I went to work on another section of the code, and realized I often need to do this same process - write a query, create a request for that query, and process it through a pipeline that ultimately returns a [[String:Any]].

So here's the million dollar question. What's the right way to encapsulate this pipeline such that I can re-use it without having to copy/pasta the entire pipeline all over the code base? This is my ... attempt at it, but it feels ...wrong?

class QueryStream: ObservableObject {

  var query: String = ""
  private var queryCancellable: AnyCancellable?

  @Published var records: [[String:Any]] = [[String:Any]]()

  func execute(){
    let queryRequest = RestClient.shared.request(forQuery: query, apiVersion: RestClient.apiVersion)

    queryCancellable = RestClient.shared.publisher(for: queryRequest)
      .receive(on: RunLoop.main)
      .tryMap({restresponse -> [String:Any] in
        let json = try restresponse.asJson() as? [String:Any]
        return json ?? [String:Any]()
      })
      .map({json -> [[String:Any]] in
        let records = json["records"] as? [[String:Any]]
        return records ?? [[:]]
      })
      .mapError{error -> Error in
        print(error)
        return error
      }
      .catch{ error in
        return Just([])
      }
    .assign(to: \.records, on: self)
  }

}

This still requires a pipeline to be written for each use. I feel like there should be some way to have a one off promise like pipeline that would allow for

let SomeRecords = QueryStream("Query here").execute()

Am I too n00b? overthinking it? What's the stack's wisdom?

Upvotes: 0

Views: 223

Answers (2)

rob mayoff
rob mayoff

Reputation: 385650

Let me first note that I think you are dispatching to main to early in your pipeline. As far as I can tell, all of your map transforms are pure functions (no side effects or references to mutable state), so they can just as well run on the background thread and thus not block the UI.

Second, as Matt said, a Publisher is generally reusable. Your pipeline builds up a big complex Publisher, and then subscribes to it, which produces an AnyCancellable. So factor out the big complex Publisher but not the subscribing.

You can factor it out into an extension method on your RestClient for convenience:

extension RestClient {
    func records<Record>(
        forQuery query: String,
        makeRecord: @escaping ([String: Any]) throws -> Record)
        -> AnyPublisher<[Record], Never>
    {
        let request = self.request(forQuery: query, apiVersion: RestClient.apiVersion)

        return self.publisher(for: request)
            .tryMap { try $0.asJson() as? [String: Any] ?? [:] }
            .map { $0["records"] as? [[String: Any]] ?? [] }
            .tryMap { try $0.map { try makeRecord($0) } }
            .mapError { dump($0) } // dump is a Swift standard function
            .replaceError(with: []) // simpler than .catch
            .eraseToAnyPublisher()
    }
}

Then you can use it like this:

struct Claim {
    var id: String
    var subject: String
    var caseNumber: String
}

extension Claim {
    static func from(json: [String: Any]) -> Claim {
        return .init(
            id: json["Id"] as? String ?? "None Listed",
            subject: json["Subject"] as? String ?? "None Listed",
            caseNumber: json["CaseNumber"] as? String ?? "0")
    }
}

class MyController {
    var claims: [Claim] = []
    var caseCancellable: AnyCancellable?

    func run() {
        let existingClaimQuery = "SELECT Id, Subject, CaseNumber FROM Case WHERE Status != 'Closed' ORDER BY CaseNumber DESC"
        caseCancellable = RestClient.shared.records(forQuery: existingClaimQuery, makeRecord: Claim.from(json:))
            .receive(on: RunLoop.main)
            .assign(to: \.claims, on: self)
    }
}

Note that I've put the receive(on: RunLoop.main) operator in the method that subscribes to the publisher, rather than building it in to the publisher. This makes it easy to add additional operators that run on a background scheduler before dispatching to the main thread.


UPDATE

From your comment:

In promise syntax, i could say execute run() as defined above, and .then(doSomethingWithThatData()) knowing that the doSomethingWithThatData wouldn't run until the intial work had completed successfully. I'm trying to develop a setup where I need to use this records(fromQuery:) method runs, and then (and only then) do soemthing with that data. I'm struggling with how to bolt that on to the end.

I don't know what promise implementation you're using, so it's difficult to know what your .then(doSomethingWithThatData()) does. What you've written doesn't really make much sense in Swift. Perhaps you meant:

.then { data in doSomething(with: data) }

In which case, the doSomething(with:) method cannot possibly be called until the data is available, because doSomething(with:) takes the data as an argument!

Upvotes: 0

matt
matt

Reputation: 535231

Entire pipelines are not reusable. Publishers are reusable. When I say "publisher" I mean an initial publisher plus operators attached to it. (Remember, an operator is itself a publisher.) A publisher can exist as a property of something, so you can subscribe to it, or it can be generated for a particular case (like a particular query request) by a function.

To illustrate, here's a one-off pipeline:

let s = "https://photojournal.jpl.nasa.gov/tiff/PIA23172.tif"
let url = URL(string:s)!
let eph = URLSessionConfiguration.ephemeral
let session = URLSession(configuration: eph)
session.dataTaskPublisher(for: url)
    .map {$0.data}
    .replaceError(with: Data())
    .compactMap { UIImage(data:$0) }
    .receive(on: DispatchQueue.main)
    .assign(to: \.image, on: self.iv)
    .store(in:&self.storage)

That pipeline tries to download the data from a URL, tests to see whether it's image data, and if it is, turns the image data into an image and displays it in an image view in the interface.

Let's say I want to do this for various different remote images. Obviously it would be ridiculous to repeat the whole pipeline everywhere. What differs up front is the URL, so let's encapsulate the first part of the pipeline as a publisher that can be generated on demand based on the URL:

func image(fromURL url:URL) -> AnyPublisher<UIImage,Never> {
    let eph = URLSessionConfiguration.ephemeral
    let session = URLSession(configuration: eph)
    return session.dataTaskPublisher(for: url)
        .map {$0.data}
        .replaceError(with: Data())
        .compactMap { UIImage(data:$0) }
        .receive(on: DispatchQueue.main)
        .eraseToAnyPublisher()
}

Now the only thing that needs to be repeated in various places in our code is the subscriber to that publisher:

let s = "https://photojournal.jpl.nasa.gov/tiff/PIA23172.tif"
let url = URL(string:s)!
image(fromURL:url)
    .map{Optional($0)}
    .assign(to: \.image, on: self.iv)
    .store(in:&self.storage)

You see? Elsewhere, we might have a different URL, and we might do something different with the UIImage that comes popping out of the call to image(fromURL:), and that's just fine; the bulk of the pipeline has been encapsulated and doesn't need to be repeated.

Your example pipeline's publisher is susceptible of that same sort of encapsulation and reuse.

Upvotes: 2

Related Questions