Reputation: 12728
I have this class:
@Slf4j
@RequiredArgsConstructor
@Service
public class SyncTransactionService {
private final SyncProducerService syncProducerService; // kafka producer
private final CouponService couponService; // db persistence service
private final CouponUpdateMessageMapper mapper; // simple mapper to generate message dto for Kafka
public void processChanges(List<Change> changes) {
Flux.fromIterable(changes)
.map(this::processAndSend)
.doOnError(e -> log.error("Cannot sync coupon with change. ", e))
.subscribeOn(Schedulers.elastic())
.subscribe();
}
private Mono<CouponUpdateMessage> processAndSend(Change change) {
return Mono.fromCallable(() -> change)
.doFirst(() -> log.info("saving or deleting the coupon: {}", change.getChanged()))
.map(this::saveOrDelete)
.thenReturn(mapper.map(change))
.doOnSuccess(message -> log.info("sending message: {}", message))
.doOnSuccess(syncProducerService::send);
}
private Mono<Void> saveOrDelete(Change change) {
if (change.getType() == DELETE) return couponService.deleteCoupon(change.getChanged());
else return couponService.saveCoupon(change.getChanged()).then();
}
}
And this test:
@ExtendWith(MockitoExtension.class)
class SyncTransactionServiceTest {
@Mock
private SyncProducerService syncProducerService;
@Mock
private CouponService couponService;
@Mock
private CouponUpdateMessageMapper mapper;
@InjectMocks
private SyncTransactionService syncTransactionService;
private static Coupon insertId1;
private static Coupon updateId2;
private static Coupon deleteId3;
private static Change change1;
private static Change change2;
private static Change change3;
@BeforeAll
private static void prepareData() {
insertId1 = DataHelper.coupon();
updateId2 = DataHelper.coupon();
updateId2.setId(2);
deleteId3 = DataHelper.coupon();
deleteId3.setId(3);
change1 = Change.builder().changed(insertId1).type(CouponUpdateType.INSERT).build();
change2 = Change.builder().changed(updateId2).type(CouponUpdateType.UPDATE).build();
change3 = Change.builder().changed(deleteId3).type(CouponUpdateType.DELETE).build();
}
@Test
void shouldProcessChanges() {
// given
List<Change> changes = List.of(change1, change2, change3);
when(couponService.saveCoupon(insertId1)).thenReturn(Mono.just(insertId1));
when(couponService.saveCoupon(updateId2)).thenReturn(Mono.just(updateId2));
when(couponService.deleteCoupon(deleteId3)).thenReturn(Mono.empty());
doNothing().when(syncProducerService).send(any());
doCallRealMethod().when(mapper).map(any());
// when
syncTransactionService.processChanges(changes);
// then
verify(couponService, times(2)).saveCoupon(any());
verify(mapper, times(3)).map(any());
verify(couponService).deleteCoupon(any());
verify(syncProducerService, times(3)).send(any());
}
}
When I run the test, Mockito.verify()
does not detect any interaction with the mocks, although I have subscribe()
in the code.
So what could be the problem in my pipeline?
Upvotes: 0
Views: 2919
Reputation: 12728
Exactly as @Martin Tarjányi said, if a method of reactive is to be tested and it is with Schedulers.elastic()
, it will launch async jobs which you cannot immediately finish and thus I see no interactions.
If I stick to it, I can:
https://github.com/awaitility/awaitility
lib or just Thread.sleep()
, like: Awaitility.waitAtMost(Duration.ofMillis(2000)).untilAsserted(() -> {verify(...);});
)StepVerifier
or block()
. Remember, for a flux, use blockLast()
to get all; blockFirst()
only emits the first element.So it is like this now:
...
public Flux<CouponUpdateMessage> processChanges(List<Change> changes) {
return Flux.fromIterable(changes)
.flatMap(this::processAndSend)
.doOnError(e -> log.error("Cannot sync coupon with change. ", e))
.subscribeOn(Schedulers.elastic()); // don't subscribe() here, but return it
}
...
And test:
...
// when
syncTransactionService.processChanges(changes).blockLast(); // process all elements
...
And I see the logs and all the interactions are recorded as I wish.
If I am not obliged to use Schedulers.elastic()
, I can just simply subscribe()
, and the test in the question will work.
Upvotes: 0
Reputation: 9947
The problem is that your method under test runs asynchronously because of the specified scheduler. You should return the Flux from your method under test and then use StepVerifier
or call collectList()
and block()
methods on the Flux to trigger and wait execution.
Upvotes: 1