Reputation: 4168
I have to create a consumer of the Kafka topics
that listens constantly and pushes data into Database
.
Here requirement is :- If you happen to read multiple records from Kafka in a single cycle, try to push that in as a single call into the db rather than multiple.
public static void kafkaConsumer(String topicName, String groupId, String autoOffsetReset,
String enableAutoCommit, String kafkaServers, String acks, String retries, String lingerMS,
String bufferMemory) throws Exception {
ObjectMapper mapper = new ObjectMapper();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
getKafkaParams(groupId, kafkaServers, autoOffsetReset, enableAutoCommit));
consumer.subscribe(Arrays.asList(topicName));
logger.info("subscibed to the topic {}", topicName);
cluster = Cluster.builder().addContactPoints(CASSANDRA_IPS.split(",")).build();
session = cluster.connect(KEYSPACE);
try {
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
Model model= mapper.readValue(record.value(), Model.class);
try {
boolean flag = insertIntoDB(session, model);
if (flag) {
logger.info("************ Data Persisted Successfully ***************");
} else {
logger.info("******* Data Persition Failed *************");
}
} catch (Exception ex) {
logger.error("Exception while persisting data into DB", ex);
}
}
} catch (Exception ex) {
logger.error("Exception while reading data from kafka", ex);
}
}
} finally {
consumer.close();
}
}
Upvotes: 1
Views: 813
Reputation: 4687
Mysql INSERT support insert many rows once. like this:
INSERT INTO tbl_name (a,b,c) VALUES(1,2,3),(4,5,6),(7,8,9);
So you could first save records into an array, and when the array size is equal to a BATCH_SIZE, you could pass it to your insertIntoDb
method. And then clear the array, go ahead the loop.
You could also take all the messages from one poll into the array, and pass it to insertIntoDb.
But if the message count is too large, the Mysql will complain the packet is too big, so in that case using a specified BATCH_SIZE is better.
Also you could specific the "max.poll.records" config for consumer to limit the message count in one poll.
something like this in Cassandra:
PreparedStatement ps = session.prepare("INSERT INTO messages (user_id,msg_id, title, body) VALUES (?, ?, ?, ?)");
BatchStatement batch = new BatchStatement();
batch.add(ps.bind(uid, mid1, title1, body1));
batch.add(ps.bind(uid, mid2, title2, body2));
batch.add(ps.bind(uid, mid3, title3, body3));
session.execute(batch);
Upvotes: 1