Yaniv Bouhadana
Yaniv Bouhadana

Reputation: 53

Observable withLatestFrom value

I Implemented a pseudo-operator called "FilterByLatestFrom" as an extension function for kotlin.

I wrote the next code using this operator:

    fun testFilterByLatestFromOperator(){
    val observableA : Observable<Int> = Observable.fromArray(1,2,3,4,5,6,7,8,9,10)
    val observableC : PublishSubject<Int> = PublishSubject.create()
    val observableB : Observable<Int> = Observable.just(2).mergeWith(observableC)

    observableB.subscribe { println("observableB onNext: $it") }

    observableA
            .subscribe({ println("Original : $it")})

    observableA.filterByLatestFrom(observableB, BiFunction { aVal, bVal -> aVal%bVal==0 })
            .subscribe({ println("Result A : $it") })

    observableC.onNext(3)

    observableA.filterByLatestFrom(observableB, BiFunction { aVal, bVal -> aVal%bVal==0 })
            .subscribe({ println("Result AC : $it") })
}

the output is :

observableB onNext: 2
Original : 1
Original : 2
Original : 3
Original : 4
Original : 5
Original : 6
Original : 7
Original : 8
Original : 9
Original : 10
Result A : 2
Result A : 4
Result A : 6
Result A : 8
Result A : 10
observableB onNext: 3
Result AC : 2
Result AC : 4
Result AC : 6
Result AC : 8
Result AC : 10

I want the filter operator will filter obsA according last value of observable B. It works for the first block but when i add On-next with new value it doesn't change the result (use same last value from the original observable).

this is the FilterByLatestFrom impl (it was design to be used from Java also (with compose):

class FilterByLatestFrom<T,U>(private val observable: Observable<T>, private val biFunction: BiFunction<U, T, Boolean>){
fun filter() : ObservableTransformer<U,U> = ObservableTransformer {
    it
            .withLatestFrom(
                    observable,
                    BiFunction<U,T,Pair<U,Boolean>> {
                        u, t -> Pair(u,biFunction.apply(u,t))
                    })
            .filter { it.second }
            .map { it.first }
    }
}
fun <T,U> Observable<U>.filterByLatestFrom(observable: Observable<T>, biFunction: BiFunction<U, T, Boolean>) : Observable<U> =
        this.compose(FilterByLatestFrom(observable,biFunction).filter())

What am I missing?

EDIT : I think I found the problem : the PublishSubject should be BehaviorSubject instead. and merge function should be conacat to promise obsC will emit after obsB.

Upvotes: 2

Views: 1512

Answers (1)

yosriz
yosriz

Reputation: 10267

Your pseudo-operator filterByLatestFrom is just fine, the problem lies within the testing, PublishSubject will emit just the subsequent items, so when in your last subscription ('result AC'), observableB will emit only 2, as observableC already emitted 3 and will not replay it to observableB (using the merge).

Just move the observableC.onNext(3) to after the last subscription (last line) and you should see the expected behavior.

EDIT: also changing to PublishSubject like you did solve the same issue (the subject will replay the last value to the new subscription)

Upvotes: 2

Related Questions