the stream in the vert.x application is blocked

vertu.x version 3.5.4 java 8

There is a class:

public class LoaderVerticle extends AbstractVerticle {

    // some code..

    @Override
    public void start(Future<Void> startFuture) throws Exception {
        super.start(startFuture);
        this.client = Utils.createHttpClient(vertx, config());

        vertx.eventBus().consumer(EVENT_LOAD, this::startLoad);
        vertx.eventBus().consumer(EVENT_LOAD_ALL, this::loadAll);
        vertx.eventBus().consumer(EVENT_CHUNK, this::loadChunk);
    }

    // some code..
}

it accepts events and then starts downloading the file, maybe in whole, or maybe in parts.

private void loadAll(Message<Object> event) {
    // some code..

    HttpClientRequest request = client.getAbs(ctx.getUrl(), response -> {
        // some code..

        response.endHandler(dummy -> {
            file.flush().close(evt -> {
                if (evt.succeeded()) {
                    ctx.setChecksum(downloadPump.getChecksum());
                    ctx.setCompleted(FileUtils.getFileSize(ctx.getTmpFile()));
                    loadComplete(ctx);
                } else {
                    loadError(String.format("Unable to close temporary download file (%s)", evt.cause()));
                }
            });
        });
    });
} 

and

private void loadChunk(Message<Object> event) {
    // some code..

    client.getAbs(ctx.getUrl(), resp -> {
        vertx.fileSystem().open(ctx.getTmpFile(), fileOpts, future -> {
            if (future.succeeded()) {
                AsyncFile file = future.result();

                resp.bodyHandler(buff -> {
                    ctx.accept(buff.getBytes());
                    progressJob(ctx, buff.length());

                    file.write(buff).flush(futWrite -> {
                        if (futWrite.succeeded()) {
                            file.close(futClose -> {
                                if (futClose.succeeded()) {
                                    if (ctx.getLoadedSize() < ctx.getTotalSize()) {
                                        nextChunk(ctx.resetRetry());
                                    } else {
                                        // some code..
                                    }
                                } else {
                                    // some code..
                                }
                            });
                        } else {
                            // some code..
                        }
                    });
                });

                resp.exceptionHandler(e -> {
                    // some code..
                });
                resp.resume();
            } else {
                // some code..
            }
        });
    });
}

the application works fine, but once a day, it is blocked with this warning :

02:10:10.154 [vertx-blocked-thread-checker] WARN io.vertx.core.impl.BlockedThreadChecker - Thread Thread[vert.x-internal-blocking-2,5,main] has been blocked for 60248 ms, time limit is 60000 io.vertx.core.VertxException: Thread blocked at sun.nio.ch.FileDispatcherImpl.force0(Native Method) ~[?:1.8.0_111] at sun.nio.ch.FileDispatcherImpl.force(FileDispatcherImpl.java:76) ~[?:1.8.0_111] at sun.nio.ch.SimpleAsynchronousFileChannelImpl.force(SimpleAsynchronousFileChannelImpl.java:162) ~[?:1.8.0_111] at io.vertx.core.file.impl.AsyncFileImpl.lambda$doFlush$3(AsyncFileImpl.java:363) ~[s-app.jar:?] at io.vertx.core.file.impl.AsyncFileImpl$$Lambda$256/1218269075.perform(Unknown Source) ~[?:?] at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$1(ContextImpl.java:275) ~[s-app.jar:?] at io.vertx.core.impl.ContextImpl$$Lambda$59/1267985667.run(Unknown Source) ~[?:?] at io.vertx.core.impl.TaskQueue.run(TaskQueue.java:76) ~[scheduler-app.jar:?] at io.vertx.core.impl.TaskQueue$$Lambda$43/2067180044.run(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[?:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[?:1.8.0_111] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[s-app.jar:?] at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_111]

and such a warning is repeated over and over again. Restarting the application helps.

Locally, I am unable to reproduce the issue as it occurs unpredictably, approximately once every one to three days. I have considered removing the flush, but I am unsure if this will resolve the issue. If you are unable to identify the cause of the problem, could you please provide me with information on how I can locate the block causing the issue and what I should be looking for?

Upvotes: 0

Views: 74

Answers (1)

Hung_PT
Hung_PT

Reputation: 41

I understand your problem. The matter is vertx uses event-loop to handle event default in event-bus. Your app blocks thread that is used by event-loop. Warning will be logged if the blocking is over 60s.

More details here: https://vertx.io/docs/vertx-core/java/#golden_rule

Solution:

  • For lower JDK21, keep in mind, no use event-loop to handle blocking operations. Let's use seperating threadPool. Read: https://vertx.io/docs/vertx-core/java/#blocking_code
  • If you use JDK21, let's config vertx with Virtual-Thread. JVM starts new v-threads for event-loop uses when current v-thread is blocking. I recommend this way.

Your problem will disappear.

Upvotes: 0

Related Questions