tson2095
tson2095

Reputation: 33

Swift CloudKit and CKQuery: how to iteratively retrieve records when queryResultBlock returns a query cursor

I am using CloudKit to retrieve records from a private database using CKQuery, using the CKQueryOperation.queryResultBlock in an async function. I've found several examples of this using queryCompletionBlock but that has been deprecated and replaced by queryResultBlock, with precious little documentation available as to how to implement it. My function works great as long as a query completion cursor is not returned (<=100 records), but I'm unable to figure out how to iterate it.

Here's the code I'm using:

public func queryRecords(recordType: CKRecord.RecordType, predicate: NSPredicate) async throws -> [CKRecord] {
    var resultRecords: [CKRecord] = []
    let db = container.privateCloudDatabase
    let query = CKQuery(recordType: recordType, predicate: predicate)
    let operation = CKQueryOperation(query: query)
    let operationQueue = OperationQueue() // for > 100 records
    operationQueue.maxConcurrentOperationCount = 1 // for > 100 records
    operation.zoneID = zoneID
    debugPrint("query for recordType=\(recordType) in zone \(zoneID.zoneName) with predicate \(predicate)")
    return try await withCheckedThrowingContinuation { continuation in
        operation.queryResultBlock = { result in
            switch result {
            case .failure(let error):
                debugPrint(error)
                continuation.resume(throwing: error)
            case .success(let ckquerycursor):
                debugPrint("successful query completion after \(resultRecords.count) record(s) returned")
                if let ckquerycursor = ckquerycursor {
                    debugPrint("***** received a query cursor, need to fetch another batch! *****")
                    let newOperation = CKQueryOperation(cursor: ckquerycursor)  // for > 100 records
                    newOperation.queryResultBlock = operation.queryResultBlock // for > 100 records
                    newOperation.database = db // for > 100 records
                    operationQueue.addOperation(newOperation) // for > 100 records
                }
                continuation.resume(returning: resultRecords)
            }
        }
        operation.recordMatchedBlock = { (recordID, result1) in
            switch result1 {
            case .failure(let error):
                debugPrint(error)
            case .success(let ckrecord):
                resultRecords.append(ckrecord)
            }
        }
        db.add(operation)
    }
}

I've attempted to implement code from similar examples but with no success: the code above results in a fatal error "SWIFT TASK CONTINUATION MISUSE" as the line

continuation.resume(returning: resultRecords)

is apparently called multiple times (illegal). The lines commented with "// for > 100 records" represent the code I've added to iterate; everything else works fine for records sets of 100 or less.

Do I need to iteratively call the queryRecords function itself, passing the query cursor if it exists, or is it possible to add the iterative operations to the queue as I've attempted to do here?

If anyone has done this before using queryResultBlock (not deprecated queryCompletionBlock) please help! Thanks!

Upvotes: 2

Views: 1669

Answers (3)

Markv07
Markv07

Reputation: 276

I found the solution below from tson2095 to work, but for large numbers of records (300+) it takes excessive time, particularly when looping for each zone. For example if I upload 300 records stored in 20 - 30 zones, it takes about 15-18 seconds. Using a different loop method that fetches all records in batches of 100 takes about 2-3 seconds to fetch the same 300+ records.

var startTime = 0 // test
var chkDone: [cabStruct] = []
for _ in 0 ..< 50 { // max will be 50 batches of 100 = 5000 records
  // Only retrieve records after last time device was synched
  let chkA = try await fetchgetItems(xcloudSyncTime: startTime) // cloudSyncTime)
  chkDone.append(contentsOf: chkA)
  for i in 0 ..< chkA.count {
    // Determine new startTime
    let modTime = anyDateToInt(dateIn: chkA[i].modificationDate!)
    if modTime > startTime {
      startTime = modTime
    }
  }
  if chkA.count < 100 { // max record fetch has been reached
    break // all records have been fetched, end loop
    }
  } // fetch loop
  // Need to resort by recordName to avoid duplicates
  if chkDone.count > 0 {
    let chkSorted = chkDone.sorted {$0.recID.recordName < $1.recID.recordName}
    var chkNoDup: [cabStruct] = [chkSorted[0]]
    for i in 1 ..< chkSorted.count {
      if chkSorted[i].recID.recordName != chkSorted[i-1].recID.recordName {
      chkNoDup.append(chkSorted[i]) // keep only unique records
      }
    }  
  } 
private func fetchgetItems(xcloudSyncTime: Int) async throws ->[cabStruct] {
// Download only newly created records after last sync
// Ref: https://stackoverflow.com/questions/68292170/swift-continuation-doesnt-make-await-continue
nAStruct: [cabStruct] = []
let date = NSDate(timeIntervalSince1970: Double(xcloudSyncTime)) //
let pred = NSPredicate(format: "modificationDate > %@", date) // only new records
let sort = NSSortDescriptor(key: "modificationDate", ascending: true)
let query = CKQuery(recordType: "cabRecord", predicate: pred)
query.sortDescriptors = [sort]
let items: [cabStruct] = await withCheckedContinuation { continuation in
let op = CKQueryOperation(query: query)
op.desiredKeys = ["recNo", "recTime", "recString", "modificationDate"]
op.resultsLimit = 100 // limit of 400 ok also
op.recordMatchedBlock = { (recordId, result) in
switch result {
  case let .success(record):
    var cabRec = cabStruct()
    cabRec.recID = record.recordID
    cabRec.recTime = record["recTime"]
    cabRec.recNos = record["recNo"]
    cabRec.recString = record["recString"]
    cabRec.recordZone = record.recordID.zoneID.zoneName
    cabRec.modificationDate = record.modificationDate
    nAStruct.append(cabRec) // global Struct
  case let .failure(error):
    actLog(newPrint: "getItems - something went wrong, error:" + error.localizedDescription)
  } // switch
 } // matchblock
 op.queryResultBlock = { result in
   continuation.resume(returning: [])
   } // result
 database.add(op)
 } // continuation
 return nAStruct
 } // end func

Upvotes: 0

user652038
user652038

Reputation:

No need for queryResultBlock in Swift 5.5.

I use this because my CKRecord types are always named the same as their Swift counterparts. You can replace recordType: "\(Record.self)" with your recordType if you want, instead.

public extension CKDatabase {
  /// Request `CKRecord`s that correspond to a Swift type.
  ///
  /// - Parameters:
  ///   - recordType: Its name has to be the same in your code, and in CloudKit.
  ///   - predicate: for the `CKQuery`
  func records<Record>(
    type _: Record.Type,
    zoneID: CKRecordZone.ID? = nil,
    predicate: NSPredicate = .init(value: true)
  ) async throws -> [CKRecord] {
    try await withThrowingTaskGroup(of: [CKRecord].self) { group in
      func process(
        _ records: (
          matchResults: [(CKRecord.ID, Result<CKRecord, Error>)],
          queryCursor: CKQueryOperation.Cursor?
        )
      ) async throws {
        group.addTask {
          try records.matchResults.map { try $1.get() }
        }
        
        if let cursor = records.queryCursor {
          try await process(self.records(continuingMatchFrom: cursor))
        }
      }

      try await process(
        records(
          matching: .init(
            recordType: "\(Record.self)",
            predicate: predicate
          ),
          inZoneWith: zoneID
        )
      )
      
      return try await group.reduce(into: [], +=)
    }
  }
}

Upvotes: 2

tson2095
tson2095

Reputation: 33

This is my modified code from Jessy's response where I've added the CKRecordZone.ID parameter in order to limit the query to a specific record zone. The function also allows query from either the public or private DB, where the public DB must use only default zone.

public func queryRecords(recordType: CKRecord.RecordType, predicate: NSPredicate, publicDB: Bool) async throws -> [CKRecord] {
            let db = publicDB ? container.publicCloudDatabase : container.privateCloudDatabase
            let zoneID = publicDB ? CKRecordZone.default().zoneID : zoneID
            return try await db.records(type: recordType, predicate: predicate, zoneID: zoneID)
        }

public extension CKDatabase {
  /// Request `CKRecord`s that correspond to a Swift type.
  ///
  /// - Parameters:
  ///   - recordType: Its name has to be the same in your code, and in CloudKit.
  ///   - predicate: for the `CKQuery`
  func records(
    type: CKRecord.RecordType,
    predicate: NSPredicate = .init(value: true),
    zoneID: CKRecordZone.ID
  ) async throws -> [CKRecord] {
    try await withThrowingTaskGroup(of: [CKRecord].self) { group in
      func process(
        _ records: (
          matchResults: [(CKRecord.ID, Result<CKRecord, Error>)],
          queryCursor: CKQueryOperation.Cursor?
        )
      ) async throws {
        group.addTask {
          try records.matchResults.map { try $1.get() }
        }
        if let cursor = records.queryCursor {
          try await process(self.records(continuingMatchFrom: cursor))
        }
      }
      try await process(
        records(
          matching: .init(
            recordType: type,
            predicate: predicate
          ),
          inZoneWith: zoneID
        )
      )
        
        return try await group.reduce(into: [], +=)
      }
    }
}

Upvotes: 0

Related Questions