Reputation: 827
Following class is my worker verticle in which i want to execute a blocking code on recieving a message from event bus on a channel named events-config.
The objective is to generate and publish json messages indefinitely until i receive stop operation message on events-config channel.
I am using executeBlocking to achieve the desired functionality. However since am running the blocking operation indefinitely , vertx blocked threadchecker dumping warnings .
Question:
- Is there a way to disable blockedthreadchecker only for specific verticle ??
- Does the code below adheres to the best practice of executing infinite loop on need basis in vertx ? If not can you please suggest best way to do this ?
public class WorkerVerticle extends AbstractVerticle {
Logger logger = LoggerFactory.getLogger(WorkerVerticle.class);
private MessageConsumer<Object> mConfigConsumer;
AtomicBoolean shouldPublish = new AtomicBoolean(true);
private JsonGenerator json = new JsonGenerator();
@Override
public void start() {
mConfigConsumer = vertx.eventBus().consumer("events-config", message -> {
String msgBody = (String) message.body();
if (msgBody.contains(PublishOperation.START_PUBLISH.getName()) && !mJsonGenerator.isPublishOnGoing()) {
logger.info("Message received to start producing data onto kafka " + msgBody);
vertx.<Void>executeBlocking(voidFutureHandler -> {
Integer numberOfMessagesToBePublished = 100000;
if (numberOfMessagesToBePublished <= 0) {
logger.info("Skipping message publish :"+numberOfMessagesToBePublished);
return; // is it best way to do it ??
}
publishData(numberOfMessagesToBePublished);
},false, voidAsyncResult -> logger.info("Blocking publish operation is terminated"));
} else if (msgBody.contains(PublishOperation.STOP_PUBLISH.getName()) && mJsonGenerator.isPublishOnGoing()) {
logger.info("Message received to terminate " + msgBody);
mJsonGenerator.terminatePublish();
}
});
}
private void publishData(){
while(shouldPublish.get()){
//code to generate json indefinitely until some one reset shouldPublish variable
}
}
}
Upvotes: 0
Views: 729
Reputation: 17701
You don't want to use busy loops in your asynchronous code.
Use vertx.setPeriodic()
or vertx.setTimer()
instead:
vertx.setTimer(20, (l) -> {
// Generate your JSON
if (shouldPublish.get()) {
// Set timer again
}
});
Upvotes: 1