次のクラスは、events-config という名前のチャネルでイベント バスからメッセージを受信するときにブロック コードを実行するワーカー バーティクルです。
目的は、events-config チャネルで停止操作メッセージを受信するまで、json メッセージを無期限に生成して公開することです。
目的の機能を実現するために executeBlocking を使用しています。ただし、ブロック操作を無期限に実行しているため、vertx はスレッドチェッカーのダンプ警告をブロックしました。
質問:
- 特定のバーティクルに対してのみ、blockedthreadchecker を無効にする方法はありますか??
- 以下のコードは、必要に応じて vertx で無限ループを実行するベスト プラクティスに従っていますか? そうでない場合は、これを行うための最良の方法を提案できますか?
public class WorkerVerticle extends AbstractVerticle {
Logger logger = LoggerFactory.getLogger(WorkerVerticle.class);
private MessageConsumer<Object> mConfigConsumer;
AtomicBoolean shouldPublish = new AtomicBoolean(true);
private JsonGenerator json = new JsonGenerator();
@Override
public void start() {
mConfigConsumer = vertx.eventBus().consumer("events-config", message -> {
String msgBody = (String) message.body();
if (msgBody.contains(PublishOperation.START_PUBLISH.getName()) && !mJsonGenerator.isPublishOnGoing()) {
logger.info("Message received to start producing data onto kafka " + msgBody);
vertx.<Void>executeBlocking(voidFutureHandler -> {
Integer numberOfMessagesToBePublished = 100000;
if (numberOfMessagesToBePublished <= 0) {
logger.info("Skipping message publish :"+numberOfMessagesToBePublished);
return; // is it best way to do it ??
}
publishData(numberOfMessagesToBePublished);
},false, voidAsyncResult -> logger.info("Blocking publish operation is terminated"));
} else if (msgBody.contains(PublishOperation.STOP_PUBLISH.getName()) && mJsonGenerator.isPublishOnGoing()) {
logger.info("Message received to terminate " + msgBody);
mJsonGenerator.terminatePublish();
}
});
}
private void publishData(){
while(shouldPublish.get()){
//code to generate json indefinitely until some one reset shouldPublish variable
}
}
}