Reputation: 30785
I want to get data from a table based on the timestamps (incremental primary keys are not available). The Kafka JDBC connector does that by using a query to get all rows with a greater timestamp than the largest timestamp from the previous iteration, as can be seen from the following code from the Kafka JDBC connector.
protected void timestampWhereClause(ExpressionBuilder builder) {
builder.append(" WHERE ");
coalesceTimestampColumns(builder);
builder.append(" > ? AND ");
coalesceTimestampColumns(builder);
builder.append(" < ? ORDER BY ");
coalesceTimestampColumns(builder);
builder.append(" ASC");
}
However, in this way, its possible that you may miss a few rows, as you may have rows with the same timestamps that were not fully consumed in the last iteration. What I want to do is to change this query condition to query for timestamps with the same or greater value than the last timestamp. Is it possible to do that through the configuration? Or Would I need to modify the code and create my own jar file?
Upvotes: 0
Views: 985
Reputation: 32100
As well as timestamp.delay.interval.ms
that Iskuskov Alexander suggested, also consider using log-based CDC to actually capture every event. The scenario that you describe is one of the limitations of query-based CDC.
More: https://rmoff.dev/no-more-silos
Upvotes: 1