Reputation: 870
I have 2 separate collections of data in my service.
Featured
and Standard
content.
I have 2 api's calls I make to return these items. They can be consumed separately, however I also have use case when I would like to take both sets of data, provide some enrichment based on a condition and then return them to a consumer.
I was hoping I could do something like this:
class ContentService: ContentServiceType {
let featured = PublishSubject<[Content]>()
let standard = PublishSubject<[Content]>()
let content: Observable<(featured: [Content], standard: [Content])>
private let client: Client<ContentAPI>
private let disposeBag = DisposeBag()
init(client: Client<ContentAPI>) {
self.client = client
content = Observable
.combineLatest(featured, standard)
.map { (featured, standard) -> (featured: [Content], standard: [Content]) in
/*
Do some enrichment and create then return new, updated versions
*/
return (featured: updatedFeatured, standard: updatedStandard)
}.share()
}
func fetchStandardContent(page: Int = 0, size: Int = 100) -> Single<Void> {
let params = ["page": page, "size": size]
let request: Single<Content> = client.request(.getStandardContent(params))
return request.map { [unowned self] launchers in
self.standard.onNext(content.props)
return ()
}
}
func fetchFeaturedContent(page: Int = 0, size: Int = 100) -> Single<Void> {
let params = ["page": page, "size": size]
let request: Single<Content> = client.request(.getFeaturedContent(params))
return request.map { [unowned self] content in
self.featured.onNext(content.props)
return ()
}
}
}
Elsewhere in my apps I was then hoping I could do something like
contentSvc.content
.observeOn(MainScheduler.instance)
.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .background))
.subscribe(onNext: { content in
/* do something w/ content */
}).disposed(by: disposeBag)
And then whenever contentSvc.fetchFeaturedContent
or contentSvc.fetchStandardContent
is called, the contentSvc.content
subscriber above gets new data.
Instead content
does not appear to be emitting any values.
Upvotes: 1
Views: 2389
Reputation: 31
I'm using BehaviorRelay instead of PublishSubject because when binding from multiple streams to PublishSubject (which is shared across apps) if any one of those streams sends a complete, it’s possible that PublishSubject may terminate. Relay classes never produce an error or never completes.
let featured = BehaviorRelay(value: [Content]())
let standard = BehaviorRelay(value: [Content]())
func getContent() -> Observable<(featured: [Content], standard: [Content])> {
return Observable
.combineLatest(
featured.asObservable(),
standard.asObservable(),
resultSelector: { (featured, standard) -> (featured: [Content], standard: [Content]) in
return (featured: featured, standard: standard)
}
)
}
func addElemToFeatured() {
featured.accept([Content(name: "abc")])
}
func addElemToStandard() {
standard.accept([Content(name: "xyz")])
}
Call the getContent() method from different classes in the initializer method. Also call addElemToFeatured, addElemToStandard from different places like button action.
listener!.getContent()
.subscribe(onNext: { (featured, standard) in
print(featured)
print(standard)
}).disposed(by: disposeBag)
Upvotes: 0
Reputation: 2478
combineLatest
requires both sources to emit before it will emit itself I believe.
I would perhaps look at using a BehaviorSubject
or BehaviorRelay
instead of a PublishSubject.
Upvotes: 3