Loki
Loki

Reputation: 77

Updating RxJava 1.0 code to RxJava 2.0

I'm updating some old android app code and can't figure out how to convert code which worked in RxJava/RxAndroid 1.0 to RxJava/RxAndroid 2.0. Code which I'm trying to convert was responsible for getting signal strength indicator and updating a chart with it's value every second. What I managed to hack up in RxJava 2.0 doesn't work at all and after few seconds app crashes with SIGABORT. I can't figure out what I'm missing?

My RxJava 1 code:

rx.Observer myObserver = new rx.Observer<Float>() {
            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Float aFloat)
            {
                addEntry(aFloat);
            }
        };

        subscription = rx.Observable.interval(1, TimeUnit.SECONDS)
                            .flatMap(new Func1<Long, rx.Observable<Float>>() {
                                @Override
                                public rx.Observable<Float> call(Long aLong) {
                                    return getSignalStrength();
                                }
                            })
                            .subscribeOn(Schedulers.io())
                            .observeOn(AndroidSchedulers.mainThread())
                            .subscribe(myObserver);


private rx.Observable<Float> getSignalStrength()
{
    return rx.Observable.create(subscriber ->
    {
        WifiManager wifiManager = (WifiManager) getContext().getSystemService(Context.WIFI_SERVICE);
        Integer value = wifiManager.getConnectionInfo().getRssi();
        subscriber.onNext(value.floatValue());
        subscriber.onCompleted();
    });
}

And what I came up with trying to do RxJava 2.0:

        DisposableObserver<Float> disposableObserver = new DisposableObserver<Float>()
        {
            @Override
            public void onNext(Float value)
            {
                addEntry(value);
            }

            @Override
            public void onError(Throwable e)
            {
            }

            @Override
            public void onComplete()
            {
            }
        };

        Disposable disposable = Observable.interval(1, TimeUnit.SECONDS)
                .flatMap(new Function<Long, ObservableSource<Float>>()
                {
                    @Override
                    public ObservableSource<Float> apply(Long aLong) throws Exception
                    {
                        return signalStrengthInfo.getSignalStrength();
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(disposableObserver);


private Observable<Float> getSignalStrength()
{
    return Observable.create(subscriber ->
    {
        wifiManager = (WifiManager) context.getSystemService(Context.WIFI_SERVICE);
        Integer signalStrength = wifiManager.getConnectionInfo().getRssi();
        subscriber.onNext(Float.valueOf(signalStrength));
        subscriber.onComplete();
    });
}

Upvotes: 0

Views: 136

Answers (1)

Master Disaster
Master Disaster

Reputation: 769

Your code in Rx2 is really embroiled.

Just see simple solution (sorry for kotlin) but if you have some problem with making it with java just ping me in comment.

val disposable = Observable.interval(1, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .flatMap { getSignalStrength() }
                .doOnNext { addEntry(it) }
                .subscribe()


private fun getSignalStrength(): Observable <Float> {
            val wifiManager = this.getSystemService(Context.WIFI_SERVICE) as WifiManager
            val signalStrength = wifiManager.connectionInfo.rssi
            return Observable.just(signalStrength.toFloat())
    }

Upvotes: 1

Related Questions