Reputation: 21
I am developing an application based on Spring Boot and kafka queues, but when developing the main of the application, it has stopped consuming from the queue and I do not know why.
--Main Application---
@Service
public class ApplicationMainClass implements ApplicationListener<ApplicationReadyEvent> {
@Autowired
PlayerDaoRepository playerDaoRepository;
@Autowired
DataColectorServiceImp dataColectorServiceImp;
@Autowired
BattleDaoRepository battleDaoRepository;
@Autowired
BattleService battleService;
private static final Logger log = LogManager.getLogger(ApplicationMainClass.class);
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
List<Playerdao> listPlayersActive;
List<BattleDao> battle;
List<BattleDao> battleDaoAux;
while (true) {
log.info("Comienza la ejecución");
listPlayersActive = playerDaoRepository.findByActive(true);
for (Playerdao player : listPlayersActive) {
try {
String battleString = dataColectorServiceImp.apiConexion(player.getUri());
if (battleString.equals("")) {
continue;
}
battle = player.getBatallasPlayed();
battleDaoAux = battleService.getBattle(battleString);
player.setLastGamePlayed(!battle.isEmpty() ? battle.get(battle.size()-1).getBattletlime() : LocalDateTime.MIN.toString());
battleDaoAux = player.kafkaHandler(battleDaoAux);
battleService.postBattle(battleDaoAux, player.getTag());
player.setBatallasPlayed(player.listBuilder(player.getBatallasPlayed(), battleDaoAux));
battleDaoRepository.saveAll(battle);
playerDaoRepository.save(player);
} catch (Exception e) {
log.error("", e);
}
}
try {
log.info("Termina la ejecucion");
Thread.sleep(60000);
} catch (InterruptedException e) {
}
}
}
}
StreamListener
@StreamListener
public KStream<IdBattle, textBattle>newBattle(@Input(BinderProcessor.battles)KStream<IdBattle,textBattle>battleKStream){
updateDatabase(battleKStream);
return null;
}
private void updateDatabase(KStream<IdBattle, textBattle> battleKStream) {
battleKStream.foreach((IdBattle,textBattle)->{
if(textBattle==null){
playerDaoRepository.deleteById(IdBattle.getIdBattle());
}else{
Battle battle= playerDaoService.textTreatment(textBattle.getText(),Battle);
battle=battleDaoService.setBattleTime(textBattle.getText(),battle);
Event event = playerDaoService.textTreatmentEvent(textBattle.getText(),Event);
battle.setMap(event.getMap());
battleDaoService.updateDatabase(battle,IdBattle.getIdBattle());
}
});
}
}
I don't know how to fix it so that both threads of the application run at the same time, and in fact I don't even know why it has stopped consuming from the queue.
Thank you very much
Upvotes: 1
Views: 92
Reputation: 21
Good, as we discussed in the comments the bug was based on that we can not make a thread that calls a sleep and a while true, because all the time will be using that thread and will not pass to the next, I have solved it by simply adding the tag "@Scheduled" to the main of the application.
This way we make sure that after the execution of the main we wait "x" seconds for the next execution, leaving resources for the streamListener.
Thank you very much and feel free to correct me if I have made a mistake in the explanation.
Upvotes: 1