WesternGun
WesternGun

Reputation: 12728

Reactive pipeline subscribed but in unit test Mockito.verify() fails (no invocation of mocks is recorded)

I have this class:

@Slf4j
@RequiredArgsConstructor
@Service
public class SyncTransactionService {
    private final SyncProducerService syncProducerService; // kafka producer
    private final CouponService couponService; // db persistence service
    private final CouponUpdateMessageMapper mapper; // simple mapper to generate message dto for Kafka

    public void processChanges(List<Change> changes) {
        Flux.fromIterable(changes)
                    .map(this::processAndSend)
                    .doOnError(e -> log.error("Cannot sync coupon with change. ", e))
                    .subscribeOn(Schedulers.elastic())
                    .subscribe();
    }

    private Mono<CouponUpdateMessage> processAndSend(Change change) {
        return Mono.fromCallable(() -> change)
                .doFirst(() -> log.info("saving or deleting the coupon: {}", change.getChanged()))
                .map(this::saveOrDelete)
                .thenReturn(mapper.map(change))
                .doOnSuccess(message -> log.info("sending message: {}", message))
                .doOnSuccess(syncProducerService::send);
    }

    private Mono<Void> saveOrDelete(Change change) {
        if (change.getType() == DELETE) return couponService.deleteCoupon(change.getChanged());
        else return couponService.saveCoupon(change.getChanged()).then();
    }

}

And this test:

@ExtendWith(MockitoExtension.class)
class SyncTransactionServiceTest {
    @Mock
    private SyncProducerService syncProducerService;

    @Mock
    private CouponService couponService;

    @Mock
    private CouponUpdateMessageMapper mapper;

    @InjectMocks
    private SyncTransactionService syncTransactionService;


    private static Coupon insertId1;
    private static Coupon updateId2;
    private static Coupon deleteId3;

    private static Change change1;
    private static Change change2;
    private static Change change3;


    @BeforeAll
    private static void prepareData() {
        insertId1 = DataHelper.coupon();
        updateId2 = DataHelper.coupon();
        updateId2.setId(2);
        deleteId3 = DataHelper.coupon();
        deleteId3.setId(3);

        change1 = Change.builder().changed(insertId1).type(CouponUpdateType.INSERT).build();
        change2 = Change.builder().changed(updateId2).type(CouponUpdateType.UPDATE).build();
        change3 = Change.builder().changed(deleteId3).type(CouponUpdateType.DELETE).build();
    }

    @Test
    void shouldProcessChanges() {
        // given
        List<Change> changes = List.of(change1, change2, change3);
        when(couponService.saveCoupon(insertId1)).thenReturn(Mono.just(insertId1));
        when(couponService.saveCoupon(updateId2)).thenReturn(Mono.just(updateId2));
        when(couponService.deleteCoupon(deleteId3)).thenReturn(Mono.empty());
        doNothing().when(syncProducerService).send(any());
        doCallRealMethod().when(mapper).map(any());

        // when
        syncTransactionService.processChanges(changes);

        // then
        verify(couponService, times(2)).saveCoupon(any());
        verify(mapper, times(3)).map(any());
        verify(couponService).deleteCoupon(any());
        verify(syncProducerService, times(3)).send(any());
    }
}

When I run the test, Mockito.verify() does not detect any interaction with the mocks, although I have subscribe() in the code.

So what could be the problem in my pipeline?

Upvotes: 0

Views: 2919

Answers (2)

WesternGun
WesternGun

Reputation: 12728

Exactly as @Martin Tarjányi said, if a method of reactive is to be tested and it is with Schedulers.elastic(), it will launch async jobs which you cannot immediately finish and thus I see no interactions.

If I stick to it, I can:

  • wait until it finishes;(use https://github.com/awaitility/awaitility lib or just Thread.sleep(), like: Awaitility.waitAtMost(Duration.ofMillis(2000)).untilAsserted(() -> {verify(...);});)
  • or, return the pipeline and test it with StepVerifier or block(). Remember, for a flux, use blockLast() to get all; blockFirst() only emits the first element.

So it is like this now:

...
    public Flux<CouponUpdateMessage> processChanges(List<Change> changes) {
        return Flux.fromIterable(changes)
                .flatMap(this::processAndSend)
                .doOnError(e -> log.error("Cannot sync coupon with change. ", e))
                .subscribeOn(Schedulers.elastic()); // don't subscribe() here, but return it
    }
...

And test:

...
   // when
   syncTransactionService.processChanges(changes).blockLast(); // process all elements

...

And I see the logs and all the interactions are recorded as I wish.


If I am not obliged to use Schedulers.elastic(), I can just simply subscribe(), and the test in the question will work.

Upvotes: 0

Martin Tarj&#225;nyi
Martin Tarj&#225;nyi

Reputation: 9947

The problem is that your method under test runs asynchronously because of the specified scheduler. You should return the Flux from your method under test and then use StepVerifier or call collectList() and block() methods on the Flux to trigger and wait execution.

Upvotes: 1

Related Questions