MartyIX
MartyIX

Reputation: 28648

How to read last N records from Chronicle Queue?

I have the following code to read from Chronicle queue (it's written in Kotlin but that does not matter):

val queue = ChronicleQueueBuilder.single(path).build()
val tailer = queue.createTailer()

tailer.toEnd()

// // This code is wrong
//    val lastIndex = tailer.index()
//
//    val shift = lastIndex - 10
//    if (shift > 0) {
//        tailer.moveToIndex(lastIndex)
//    }

while (true) {
    val text = await(tailer)

    if (prefix == null) {
        println(text)
    } else {
        if (text.startsWith(prefix)) {
            // Would be nice without additional allocation ...
            println(text.substring(prefix.length + 1))
        }
    }
}

How can I modify the commented code to read previous 10 records from the queue and continue on?

Rationale: It is useful in situations where the queue is used for displaying logs. You want to see a few previous logging statements and see new logging statements as they come.

Upvotes: 1

Views: 1306

Answers (2)

Mark Price
Mark Price

Reputation: 326

In addition to directly using the index, you could use the direction property of the ExcerptTailer:

    final SingleChronicleQueue queue = createQueue();

    final int totalRecords = 20;
    final int tailRecords = 10;

    final ExcerptAppender appender = queue.acquireAppender();
    for (int i = 0; i < totalRecords; i++) {
        try(final DocumentContext ctx = appender.writingDocument()) {
            ctx.wire().writeText(Integer.toString(i));
        }
    }

    final ExcerptTailer tailer = queue.createTailer();
    tailer.direction(TailerDirection.BACKWARD).toEnd();

    int rewind = tailRecords;
    final int endCycle = tailer.cycle();
    while(--rewind != 0) {
        try(final DocumentContext ctx = tailer.readingDocument()) {
            if (!ctx.isPresent()) {
                break;
            }

            if (endCycle != tailer.cycle()) {
                System.out.println("Rewound past beginning of cycle");
            }
        }
    }

    tailer.direction(TailerDirection.FORWARD);

    for (int i = 0; i < tailRecords; i++) {
        try(final DocumentContext ctx = tailer.readingDocument()) {
            if (!ctx.isPresent()) {
                break;
            }

            System.out.println(ctx.wire().readText());
        }
    }

Upvotes: 1

Devas
Devas

Reputation: 1694

I have written a test for you. Please run the same it should work.

public class ChronicleTest {

private String chroniclePath = "/tmp/chronicle-test";

private int msgCount = 10;

private int i = 0;

    @Test
    public void writeToQ() {
        ChronicleQueue queue = ChronicleQueueBuilder.single(chroniclePath).build();
        ExcerptAppender appender = queue.acquireAppender();
        for (i = 1; i <= msgCount; i++) {
            appender.writeBytes(b -> {
                b.writeInt(i);
            });
        }
        ExcerptTailer tailer = queue.createTailer();
        tailer.toEnd();
        long lastIndex = tailer.index();
        tailer.moveToIndex(lastIndex - 5);

        while (tailer.readBytes(b -> {
            int value = b.readInt();
            System.out.println("Received:" + value);
        }))
            System.out.println("Completed");
    }
}

Upvotes: 2

Related Questions