Reputation: 109
My app successfully sends Kafka messages, but only after Kafka is initialized. Before that i get the error "Dispatcher has no subscribers". How do i wait for subscribers to finish being registered for channels?
Here's a trace of the order of events (timing in second.ms):
i'm not sure how to approach this. Wild guesses have included:
Created a new app, made it as simple as i could:
public interface Source {
@Output(channelName)
MessageChannel outboundChannel();
}
@EnableBinding(Source.class)
@Component
public class Sender {
@Autowired
private Source source;
public boolean send(SomeObject object) {
return source.outboundChannel().send(MessageBuilder.withPayload(object).build());
}
@Service
public class Scheduler {
@Autowired
Sender sender;
@Autowired
ThreadPoolTaskScheduler taskScheduler;
@PostConstruct
public void initialize() {
taskScheduler.schedule(new PollingTask(), nextTime);
}
private class PollingTask implements Runnable {
@Override
public void run() {
List<SomeObject> objects = getDummyData();
for(SomeObject object : objects)
{
sender.send(interval);
}
Instant nextTime = Instant.now().plusMillis(1_000L);
try {
taskScheduler.schedule(new PollingTask(), nextTime);
} catch (Exception e) {
logger.error(e);
}
}
}
Edit to add Solution
It works now! In my scheduler that starts the things that send the messages i switched from starting things in @PostConstruct to SmartLifecycle::start().
@Service
public class Scheduler implements SmartLifecycle {
@Autowired
Sender sender;
@Autowired
ThreadPoolTaskScheduler taskScheduler;
@Override
public void start() {
taskScheduler.schedule(new PollingTask(), nextTime);
}
private class PollingTask implements Runnable {
@Override
public void run() {
List<SomeObject> objects = getDummyData();
for(SomeObject object : objects)
{
sender.send(interval);
}
Instant nextTime = Instant.now().plusMillis(1_000L);
try {
taskScheduler.schedule(new PollingTask(), nextTime);
} catch (Exception e) {
logger.error(e);
}
}
}
Upvotes: 1
Views: 455
Reputation: 69
I faced a similar problem in Webflux + Spring Cloud Stream functional style. Spring Cloud Function in 2022 is the preferred way.
My hypothesis after a lot of debugging was that beans were not created in right order. The bean was probably not registered in spring-cloud-stream's dispatchers before kafka message processing started. similar to what @gary mentioned.
So I added @Order(1)
before my consumer beans. Hoping that this bean would be created before it is dispatcher-registrations starts.
@Bean
@Order(1)
public Function<Flux<Message<Pojo>>, Mono<Void>> pojoConsumer() {
This seems to fix my issue for now.
Upvotes: 0
Reputation: 174504
@PostConstruct is too early to send messages; the context is still being built.. Implememt SmartLifecycle, put the bean in a high phase (Integer.MAX_VALUE) and do the sends in start().
Or do the sends in an ApplicationRunner.
Upvotes: 1