Reputation: 21
I have 2 topics topic1 and topic2 I am producing messages into test topic
topic1's configuration:
Topic: topic1 TopicId: <topic_ID> PartitionCount: 1 ReplicationFactor: 1 Configs: min.insync.replicas=1,cleanup.policy=delete,retention.ms=24192000000,retention.bytes=-1
Topic: topic1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
topic2's configuration:
Topic: topic2 TopicId: <topic_ID> PartitionCount: 1 ReplicationFactor: 1 Configs: min.insync.replicas=1,cleanup.policy=compact,retention.ms=60000,retention.bytes=-1
Topic: topic2 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
The message in test topic is follows:
message1:
key:
{
"COl1": {
"string": "dummy001"
}
}
value:
{
"data": {
"Col1": "dummy001",
"Col2": "2023-01-01 01:01:01",
"Col3": "1000.100"
},
"operation": "INSERT",
"timestamp": "2023-01-01 01:01:01"
}
message2:
key:
{
"COl1": {
"string": "dummy001"
}
}
value:
{
"data": {
"Col1": "dummy001",
"Col2": "2023-02-02 02:02:02",
"Col3": "2000.100"
},
"operation": "UPDATE",
"timestamp": "2023-02-02 02:02:02"
}
I have created a Kafka stream "stream1" to extract my data from topic1 Below is how the data will look like in stream1:
+--------------------------------------------------------------------------------------------------------------------------------------------------+
|DATA |
+--------------------------------------------------------------------------------------------------------------------------------------------------+
|{COL1=dummy001, COL2=2023-01-01 01:01:01, COL3=1000.100} |
|{COL1=dummy001, COL2=2023-02-02 02:02:02, COL3=2000.100}
I have created another Kafka stream "stream2" and defined col1 as Key column.
ksql> create stream stream2 (col1 string key, col2 string, col3 string) with(kafka_topic='topic2', value_format='AVRO');
Then I inserted values into stream2 from stream1
ksql> insert into stream2 select data->COL1 as COL1, data->COL2 as COL2, data->COl3 as COL3 from stream1 partition by data->col1;
My data in topic2 is as follows:
message1: key:
dummy001
value:
{
"COL2": {
"string": "2023-01-01 01:01:01"
},
"COL3": {
"string": "1000.100"
}
}
message2: key: dummy001 value:
{
"COL2": {
"string": "2023-02-02 02:02:02"
},
"COL3": {
"string": "2000.100"
}
}
The issue which I am facing is accordance with cleanup policy "COMPACT". Though I have set clean up policy to compact my messages aren't getting deleted. As I mentioned above how I am processing currently from topic1 to topic2 using stream1 and stream2. Even after defining the key column as col1 the cleanup policy doesn't work. When I set my cleanup policy to "COMPACT,DELETE" it deletes all the messages.
As per my understanding, as the message has a key column it should keep the latest message as per key column per message and clear the older messages. But it doesn't work.
Kindly let me know if I am missing out something.
What I am trying to achieve is to keep the latest message as per col1 at any given point of time in topic2.
Upvotes: 2
Views: 224
Reputation: 176
The policy you set makes no guarantee on when it will be run. As such it does not mean that the compaction will run every time you post the same key (that would be inefficient).
It's on the consumer to maintain the "up to date" state of their view on the data.
Upvotes: 1