Nodir Rashidov
Nodir Rashidov

Reputation: 732

Asynchronous testing in rxjava not working

I am trying to unit test my use case the following way:

@RunWith(MockitoJUnitRunner.class)
public class DeviceCheckInUseCaseTest extends InstrumentationTestCase {

    @Rule
    public MockitoRule mockitoRule = MockitoJUnit.rule();

    @Mock
    ShopRepository shopRepository;

    @Mock
    UserRepository userRepository;

    @Mock
    SettingRepository settingRepository;

    private DeviceCheckInUseCase deviceCheckInUseCase;

    private PublishSubject<CheckInResult> checkinSubject;

    private static final String SHOPID = "shop id";
    private static final String MEMBERID = "member id";

    private TestSubscriber<CheckInResult> testSubscriber;

    @Mock
    Activity activity;

    @Before
    public void setUp() {

        testSubscriber = new TestSubscriber<>();
        checkinSubject = PublishSubject.create();

        when(shopRepository
            .checkIn(
                activity,
                SHOPID,
                MEMBERID,
                -60
            )
        ).thenReturn(checkinSubject.asObservable());

        when(settingRepository.getCheckinRssiThreshold()).thenReturn(-60);

        deviceCheckInUseCase = new DeviceCheckInUseCase(userRepository, shopRepository, settingRepository);
    }

    @Test
    public void testCheckInSucceeded() {
        final ArrayList<CheckInResult> succeeded = new ArrayList<CheckInResult>();
        final ArrayList<Throwable> failed = new ArrayList<Throwable>();

        this.deviceCheckInUseCase.checkIn(activity, SHOPID, MEMBERID)
            .doOnNext(new Action1<CheckInResult>() {
                @Override
                public void call(CheckInResult checkInResult) {
                    succeeded.add(checkInResult);
                }
            })
            .doOnError(new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    failed.add(throwable);
                }
            })
            .subscribe(testSubscriber)
        ;

        testSubscriber.assertValueCount(0);
        assertEquals(0, succeeded.size());
        assertEquals(0, failed.size());

        checkinSubject.onNext(new CheckInSuccess());

        this.testSubscriber.assertValueCount(1);
        assertEquals(1, succeeded.size());
        assertEquals(0, failed.size());
    }
}

And below is my DeviceCheckInUseCase.java

public class DeviceCheckInUseCase {

    private UserRepository userRepository;
    private ShopRepository shopRepository;
    private SettingRepository settingRepository;
//    private Subscription


    @Inject
    public DeviceCheckInUseCase(UserRepository userRepository, ShopRepository shopRepository, SettingRepository settingRepository) {
        this.userRepository = userRepository;
        this.shopRepository = shopRepository;
        this.settingRepository = settingRepository;
    }

    public Observable<CheckInResult> checkIn(final Activity context, final String shopId, final String memberId) {

        return Observable.create(new Observable.OnSubscribe<CheckInResult>() {
            @Override
            public void call(final Subscriber<? super CheckInResult> subscriber) {

                final Subscription subscription = new BooleanSubscription();

                shopRepository
                    .checkIn(context, shopId, memberId, settingRepository.getCheckinRssiThreshold())
                    .retryWhen(new Func1<Observable<? extends Throwable>, Observable<Integer>>() {

                        @Override
                        public Observable<Integer> call(Observable<? extends Throwable> failure) {
                            return failure.flatMap(new Func1<Throwable, Observable<Integer>>() {

                                @Override
                                public Observable<Integer> call(Throwable result) {

                                    if (result instanceof BluetoothFailure) {
                                        return Observable.error(result);
                                    } else if (!subscription.isUnsubscribed()) {
                                        return Observable.just(0);
                                    } else {
                                        return Observable.error(result);
                                    }
                                }

                            });
                        }
                    })
                    .flatMap(new Func1<CheckInResult, Observable<CheckInResult>>() {
                        @Override
                        public Observable<CheckInResult> call(final CheckInResult checkInResult) {
                            return userRepository.checkin()
                                    .map(new Func1<Void, CheckInResult>() {
                                        @Override
                                        public CheckInResult call(Void aVoid) {
                                            return checkInResult;
                                        }
                                    });
                        }
                    })
                    .subscribe(subscriber);
            }
        });
    }

    public boolean enableCheckin() {
        return userRepository.enableCheckin();
    }
}

As you can see I am expecting the succeeded array increase by one element, but its size remains empty.

I have been trying to figure out why this is happening, came across asynchronous testing with testschedulers and testsubscribers, but I dont think i am using it right:

in @Before I am adding:

subscriber = new TestSubscriber<>();
checkinSubject.subscribe(subscriber);

and changed the assertion in @Test part to:

this.subscriber.assertValueCount(0);
assertEquals(0, succeeded.size());
assertEquals(0, failed.size());

this.checkinSubject.onNext(new CheckInSuccess());

this.subscriber.assertValueCount(1);
assertEquals(1, succeeded.size());
assertEquals(0, failed.size());

the assertValueCount is working OK but succeeded still remains unchanged. How do I get succeeded's assertion to work?

Upvotes: 2

Views: 1125

Answers (2)

Dean Xu
Dean Xu

Reputation: 4691

Your subscribe chain was broken. You should subscribe on the downstream not the upperstream. So subscribe after doOnError instead of subscribe dircetly on the Subject.

this.deviceCheckInUseCase.checkIn()
    .doOnNext(succeeded::add)
    .doOnError(failed::add)
    .subscribe(subscriber);

Following is the complete test code:

import static org.junit.Assert.assertEquals;

import java.util.ArrayList;

import org.junit.Before;
import org.junit.Test;

import rx.Observable;
import rx.Subscription;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;
import rx.subscriptions.BooleanSubscription;

public class Q47049714 {

  public static class BluetoothFailure extends Exception {
  }

  public static class CheckInSuccess implements CheckInResult {
  }

  public static interface CheckInResult {
  }

  public class ShopRepository {
    public Observable<CheckInResult> checkIn() {
      return checkinSubject;
    }
  }

  public class UserRepository {
    public Observable<Void> checkIn() {
      return Observable.just(null);
    }
  }

  public class DeviceCheckInUseCase {
    ShopRepository shopRepository = new ShopRepository();
    UserRepository userRepository = new UserRepository();

    public Observable<CheckInResult> checkIn() {
      return Observable.create(subscriber -> {
        Subscription subscription = new BooleanSubscription();
        shopRepository
            .checkIn()
            .retryWhen(failure -> failure.flatMap(result -> {
              if (result instanceof BluetoothFailure) {
                return Observable.error(result);
              } else if (!subscription.isUnsubscribed()) {
                return Observable.just(0);
              } else {
                return Observable.error(result);
              }
            }))
            .flatMap(checkInResult -> userRepository.checkIn().map(v -> checkInResult))
            .subscribe(subscriber);
      });
    }
  }

  PublishSubject<CheckInResult> checkinSubject;
  DeviceCheckInUseCase deviceCheckInUseCase;
  TestSubscriber<CheckInResult> subscriber;

  @Before
  public void setUp() {
    checkinSubject = PublishSubject.create();
    subscriber = new TestSubscriber<>();
    deviceCheckInUseCase = new DeviceCheckInUseCase();
  }

  @Test
  public void testCheckInSucceeded() {
    final ArrayList<CheckInResult> succeeded = new ArrayList<>();
    final ArrayList<Throwable> failed = new ArrayList<>();

    deviceCheckInUseCase.checkIn()
        .doOnNext(succeeded::add)
        .doOnError(failed::add)
        .subscribe(subscriber);

    subscriber.assertValueCount(0);
    assertEquals(0, succeeded.size());
    assertEquals(0, failed.size());

    checkinSubject.onNext(new CheckInSuccess());

    subscriber.assertValueCount(1);
    assertEquals(1, succeeded.size());
    assertEquals(0, failed.size());
  }
}

Upvotes: 1

Boris Kozyrev
Boris Kozyrev

Reputation: 152

Maybe you can try adding

.subscribeOn(Schedulers.immediate())
.observeOn(Schedulers.immediate())

before your doOnNext. But i'm not sure if it will help.

Upvotes: 0

Related Questions