Reputation: 53
I Implemented a pseudo-operator called "FilterByLatestFrom" as an extension function for kotlin.
I wrote the next code using this operator:
fun testFilterByLatestFromOperator(){
val observableA : Observable<Int> = Observable.fromArray(1,2,3,4,5,6,7,8,9,10)
val observableC : PublishSubject<Int> = PublishSubject.create()
val observableB : Observable<Int> = Observable.just(2).mergeWith(observableC)
observableB.subscribe { println("observableB onNext: $it") }
observableA
.subscribe({ println("Original : $it")})
observableA.filterByLatestFrom(observableB, BiFunction { aVal, bVal -> aVal%bVal==0 })
.subscribe({ println("Result A : $it") })
observableC.onNext(3)
observableA.filterByLatestFrom(observableB, BiFunction { aVal, bVal -> aVal%bVal==0 })
.subscribe({ println("Result AC : $it") })
}
the output is :
observableB onNext: 2
Original : 1
Original : 2
Original : 3
Original : 4
Original : 5
Original : 6
Original : 7
Original : 8
Original : 9
Original : 10
Result A : 2
Result A : 4
Result A : 6
Result A : 8
Result A : 10
observableB onNext: 3
Result AC : 2
Result AC : 4
Result AC : 6
Result AC : 8
Result AC : 10
I want the filter operator will filter obsA according last value of observable B. It works for the first block but when i add On-next with new value it doesn't change the result (use same last value from the original observable).
this is the FilterByLatestFrom impl (it was design to be used from Java also (with compose):
class FilterByLatestFrom<T,U>(private val observable: Observable<T>, private val biFunction: BiFunction<U, T, Boolean>){
fun filter() : ObservableTransformer<U,U> = ObservableTransformer {
it
.withLatestFrom(
observable,
BiFunction<U,T,Pair<U,Boolean>> {
u, t -> Pair(u,biFunction.apply(u,t))
})
.filter { it.second }
.map { it.first }
}
}
fun <T,U> Observable<U>.filterByLatestFrom(observable: Observable<T>, biFunction: BiFunction<U, T, Boolean>) : Observable<U> =
this.compose(FilterByLatestFrom(observable,biFunction).filter())
What am I missing?
EDIT : I think I found the problem : the PublishSubject should be BehaviorSubject instead. and merge function should be conacat to promise obsC will emit after obsB.
Upvotes: 2
Views: 1512
Reputation: 10267
Your pseudo-operator filterByLatestFrom
is just fine, the problem lies within the testing, PublishSubject
will emit just the subsequent items, so when in your last subscription ('result AC'), observableB
will emit only 2, as observableC
already emitted 3 and will not replay it to observableB
(using the merge
).
Just move the observableC.onNext(3)
to after the last subscription (last line) and you should see the expected behavior.
EDIT:
also changing to PublishSubject
like you did solve the same issue (the subject will replay the last value to the new subscription)
Upvotes: 2