Venkat
Venkat

Reputation: 23

StreamCorruptedException on chronicle queue making the queue unusable

We had a StreamCorruptedException from chroniclequeue(chronicle-queue-5.20.106, Red Hat Linux release 6.10) and we have pasted the stacktrace below. During that time, there was a very high IO/disk operation by completely different process which we believe caused the chroniclequeue to pause for more than 15seconds and caused this corruption.

Even after restart as the queue was corrupted and it couldn't come up. Only way is to delete and start fresh, meaning loosing millions of data

Please help with solution or any work around. Thanks

STACKTRACE

2020-11-18 09:55:38,905536 [4b54debf-f9e2-4c70-9152-f05fe840bc92] [TableStoreWriteLock] (WARN) Couldn't acquire write lock after 15000 ms for the lock file:/local/data/metadata.cq4t, overriding the lock. Lock was held by me
2020-11-18 09:55:38,905795 [4b54debf-f9e2-4c70-9152-f05fe840bc92] [TableStoreWriteLock] (WARN) Forced unlock for the lock file:/local/data/metadata.cq4t, unlocked: true net.openhft.chronicle.core.StackTrace: Forced unlock on Reader STRESSTEST01
        at net.openhft.chronicle.queue.impl.table.AbstractTSQueueLock.forceUnlockIfProcessIsDead(AbstractTSQueueLock.java:52)
        at net.openhft.chronicle.queue.impl.single.TableStoreWriteLock.lock(TableStoreWriteLock.java:70)
        at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:349)
        at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:325)

followed by

2020-11-18 09:55:42,364992 [] [ChronicleTxn] (ERROR) Error on commit java.lang.IllegalStateException: java.io.StreamCorruptedException: Data at 138604 overwritten? Expected: 0 was c3
        at net.openhft.chronicle.queue.impl.single.StoreAppender$StoreAppenderContext.close(StoreAppender.java:842)
        at net.openhft.chronicle.queue.impl.single.StoreAppender$StoreAppenderContext.close(StoreAppender.java:782)

ERROR ON RESTART

java.lang.UnsupportedOperationException: Unknown_4
        at net.openhft.chronicle.wire.BinaryWire$BinaryValueIn.cantRead(BinaryWire.java:3648)
        at net.openhft.chronicle.wire.BinaryWire$BinaryValueIn.bytes(BinaryWire.java:2591)

SIMULATION TEST CLASS

import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.wire.DocumentContext;

import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;

public class SimulateStreamCorruptedException {
private static final int NO_OF_DOCUMENTS_TO_INSERT = 100_000;
private static final int NO_OF_THREADS = 50;
private String textToWrite = "This is a sample text to be written and value is ";
private String dbFolder = System.getProperty("dbFolder","/tmp/chroniclequeue");
private AtomicLong noOfDocuments = new AtomicLong();

public static void main(String[] args) throws InterruptedException {
    SimulateStreamCorruptedException simulator = new SimulateStreamCorruptedException();
    simulator.simulateError();
}

private void simulateError() throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(NO_OF_THREADS);
    ScheduledExecutorService preTouchScheduler = Executors.newScheduledThreadPool(1);
    try(ChronicleQueue queue = getQueue()) {
        preTouchScheduler.scheduleAtFixedRate(() -> queue.acquireAppender().pretouch(), 0, 1, TimeUnit.SECONDS);
        IntStream.rangeClosed(1, NO_OF_THREADS).forEach(i -> startWriterThread(queue,i,latch));
        latch.await();
    } finally {
        preTouchScheduler.shutdownNow();
    }
}

private void startWriterThread(ChronicleQueue queue,int threadCount,CountDownLatch latch) {
    Runnable task = () -> {
        System.out.println("Starting the writing for Thread-"+threadCount);
        IntStream.rangeClosed(1, NO_OF_DOCUMENTS_TO_INSERT).forEach(i -> {
            try(DocumentContext dc = queue.acquireAppender().writingDocument()) {
                String text = textToWrite+(threadCount+i);
                dc.wire().write().bytes(text.getBytes());
                simulatePause();
            }
        });
        System.out.println("Completed the writing for Thread-"+threadCount);
        latch.countDown();
    };
    new Thread(task).start();
}

private void simulatePause() {
    if(noOfDocuments.incrementAndGet()%100==0) {
        try {Thread.sleep(20*1000);}
        catch (InterruptedException e) {e.printStackTrace();}
    }
}

private ChronicleQueue getQueue() {
    File folder = new File(dbFolder);
    if(!folder.exists()) folder.mkdirs();
    return ChronicleQueue.singleBuilder(folder)
            .rollCycle(RollCycles.DAILY)
            .strongAppenders(true)
            .build();
}

}

Upvotes: 1

Views: 826

Answers (1)

Dmitry Pisklov
Dmitry Pisklov

Reputation: 1206

If there's a possibility for your application to be stalled for 15 seconds there's no solution possible on the Chronicle Queue side - you should reconsider the way your software works as Chronicle's tools are developed with ultra-low-latency in mind and we cater for microsecond latencies, not seconds.

If the lock is forcibly unlocked which is the case here the data WILL be corrupted irreversibly.

A workaround however could be to increase the timeout - default is 15000ms but when creating a queue you can increase it by using builder#timeoutMS() to specify something that works in your environment.

Upvotes: 1

Related Questions