chinna
chinna

Reputation: 51

RxSwift: How to Run Multiple Observables parallelly

I am Chinna. I am new to rxswift. I have a task like I have a few functions which are acts like observables. Currently these all observables executing sequentially. due to this my application performance is going down. I want to execute all these functions parallelly. These functions are not depending on each other. I have tried with zip function but these are running sequentially.

    return Observable.create({ (observer) -> Disposable in
_ = Observable.zip(
self.createOrAlterTable(call).subscribeOn(ConcurrentMainScheduler.instance),
self.formSchemaToSQLite(call,lastSyncTime: lastSyncTime).subscribeOn(ConcurrentMainScheduler.instance),
self.menuTableRecord(call, lastSyncTime: lastSyncTime).subscribeOn(ConcurrentMainScheduler.instance),
self.recordsTOSqlite(call,lastSyncTime: lastSyncTime).subscribeOn(ConcurrentMainScheduler.instance),
self.imagerecordsTOSqlite(call,lastSyncTime: lastSyncTime).subscribeOn(ConcurrentMainScheduler.instance),
self.syncGraphDBNode(call).subscribeOn(ConcurrentMainScheduler.instance),
self.syncSchemasToSQLServer(call).subscribeOn(ConcurrentMainScheduler.instance),
self.syncRecordsToSQLServer(call).subscribeOn(ConcurrentMainScheduler.instance)
).observeOn(MainScheduler.asyncInstance)
.subscribe(onNext: {
xdata,ydata,adata,bdata,cdata,ddata,edata,fdata in
print("All Operations Done");
})
return Disposables.create()

all above functions will looks like below

func syncSchemasToSQLServer(_ call: CAPPluginCall) -> Observable<String>{
    return Observable.just("SYNC_SCHEMAS_TO_SQLSERVER")
}

but still these functions executing sequntially. please help me to solve this issue.

Updated

    func example() -> Observable<String>{
      var res=  Observable.zip(
            doJob(1).subscribeOn(ConcurrentDispatchQueueScheduler(qos: .default)),
            doJob(2).subscribeOn(ConcurrentDispatchQueueScheduler(qos: .default)),
            doJob(3).subscribeOn(ConcurrentDispatchQueueScheduler(qos: .default))
        )
        .debug("done")
        .subscribe()

   return Observable.just("Success");
    }
    
    func doJob(_ value: Int) -> Observable<Void> {
        return Observable.create { observer in
            print("starting job", value)
            sleep(3)
            print("done with job", value)
            observer.onNext(())
            observer.onCompleted()
            return Disposables.create()
        }
    }

Upvotes: 2

Views: 1010

Answers (1)

Daniel T.
Daniel T.

Reputation: 33967

The ConcurrentMainScheduler.instance doesn't do things in parallel. Use ConcurrentDispatchQueueScheduler instead. The code below works:

func example() {
    doAllJobs()
        .subscribe(onNext: { print($0) })
}

func doAllJobs() -> Observable<String>{
    Observable.zip(
        doJob(1).subscribeOn(ConcurrentDispatchQueueScheduler(qos: .default)),
        doJob(2).subscribeOn(ConcurrentDispatchQueueScheduler(qos: .default)),
        doJob(3).subscribeOn(ConcurrentDispatchQueueScheduler(qos: .default))
    )
    .map { _ in "Success" }
}

func doJob(_ value: Int) -> Observable<Void> {
    Observable.create { observer in
        print("starting job", value)
        sleep(3)
        print("done with job", value)
        observer.onNext(())
        observer.onCompleted()
        return Disposables.create()
    }
}

This would work as well:

func doAllJobs() -> Observable<String> {
    let scheduler = ConcurrentDispatchQueueScheduler(qos: .default)
    return Observable.zip(
        doJob(1).subscribeOn(scheduler),
        doJob(2).subscribeOn(scheduler),
        doJob(3).subscribeOn(scheduler)
    )
    .map { _ in "Success" }
}

Upvotes: 3

Related Questions