Reputation: 23
We had a StreamCorruptedException from chroniclequeue(chronicle-queue-5.20.106, Red Hat Linux release 6.10) and we have pasted the stacktrace below. During that time, there was a very high IO/disk operation by completely different process which we believe caused the chroniclequeue to pause for more than 15seconds and caused this corruption.
Even after restart as the queue was corrupted and it couldn't come up. Only way is to delete and start fresh, meaning loosing millions of data
Please help with solution or any work around. Thanks
STACKTRACE
2020-11-18 09:55:38,905536 [4b54debf-f9e2-4c70-9152-f05fe840bc92] [TableStoreWriteLock] (WARN) Couldn't acquire write lock after 15000 ms for the lock file:/local/data/metadata.cq4t, overriding the lock. Lock was held by me
2020-11-18 09:55:38,905795 [4b54debf-f9e2-4c70-9152-f05fe840bc92] [TableStoreWriteLock] (WARN) Forced unlock for the lock file:/local/data/metadata.cq4t, unlocked: true net.openhft.chronicle.core.StackTrace: Forced unlock on Reader STRESSTEST01
at net.openhft.chronicle.queue.impl.table.AbstractTSQueueLock.forceUnlockIfProcessIsDead(AbstractTSQueueLock.java:52)
at net.openhft.chronicle.queue.impl.single.TableStoreWriteLock.lock(TableStoreWriteLock.java:70)
at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:349)
at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:325)
followed by
2020-11-18 09:55:42,364992 [] [ChronicleTxn] (ERROR) Error on commit java.lang.IllegalStateException: java.io.StreamCorruptedException: Data at 138604 overwritten? Expected: 0 was c3
at net.openhft.chronicle.queue.impl.single.StoreAppender$StoreAppenderContext.close(StoreAppender.java:842)
at net.openhft.chronicle.queue.impl.single.StoreAppender$StoreAppenderContext.close(StoreAppender.java:782)
ERROR ON RESTART
java.lang.UnsupportedOperationException: Unknown_4
at net.openhft.chronicle.wire.BinaryWire$BinaryValueIn.cantRead(BinaryWire.java:3648)
at net.openhft.chronicle.wire.BinaryWire$BinaryValueIn.bytes(BinaryWire.java:2591)
SIMULATION TEST CLASS
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.wire.DocumentContext;
import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
public class SimulateStreamCorruptedException {
private static final int NO_OF_DOCUMENTS_TO_INSERT = 100_000;
private static final int NO_OF_THREADS = 50;
private String textToWrite = "This is a sample text to be written and value is ";
private String dbFolder = System.getProperty("dbFolder","/tmp/chroniclequeue");
private AtomicLong noOfDocuments = new AtomicLong();
public static void main(String[] args) throws InterruptedException {
SimulateStreamCorruptedException simulator = new SimulateStreamCorruptedException();
simulator.simulateError();
}
private void simulateError() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(NO_OF_THREADS);
ScheduledExecutorService preTouchScheduler = Executors.newScheduledThreadPool(1);
try(ChronicleQueue queue = getQueue()) {
preTouchScheduler.scheduleAtFixedRate(() -> queue.acquireAppender().pretouch(), 0, 1, TimeUnit.SECONDS);
IntStream.rangeClosed(1, NO_OF_THREADS).forEach(i -> startWriterThread(queue,i,latch));
latch.await();
} finally {
preTouchScheduler.shutdownNow();
}
}
private void startWriterThread(ChronicleQueue queue,int threadCount,CountDownLatch latch) {
Runnable task = () -> {
System.out.println("Starting the writing for Thread-"+threadCount);
IntStream.rangeClosed(1, NO_OF_DOCUMENTS_TO_INSERT).forEach(i -> {
try(DocumentContext dc = queue.acquireAppender().writingDocument()) {
String text = textToWrite+(threadCount+i);
dc.wire().write().bytes(text.getBytes());
simulatePause();
}
});
System.out.println("Completed the writing for Thread-"+threadCount);
latch.countDown();
};
new Thread(task).start();
}
private void simulatePause() {
if(noOfDocuments.incrementAndGet()%100==0) {
try {Thread.sleep(20*1000);}
catch (InterruptedException e) {e.printStackTrace();}
}
}
private ChronicleQueue getQueue() {
File folder = new File(dbFolder);
if(!folder.exists()) folder.mkdirs();
return ChronicleQueue.singleBuilder(folder)
.rollCycle(RollCycles.DAILY)
.strongAppenders(true)
.build();
}
}
Upvotes: 1
Views: 826
Reputation: 1206
If there's a possibility for your application to be stalled for 15 seconds there's no solution possible on the Chronicle Queue side - you should reconsider the way your software works as Chronicle's tools are developed with ultra-low-latency in mind and we cater for microsecond latencies, not seconds.
If the lock is forcibly unlocked which is the case here the data WILL be corrupted irreversibly.
A workaround however could be to increase the timeout - default is 15000ms but when creating a queue you can increase it by using builder#timeoutMS() to specify something that works in your environment.
Upvotes: 1