Reputation: 11
What I am trying to do:
I have a schema registry and I would like my messages to be checked against it before being saved. It can work with the output record value format being set to JSON_SR
My problem is message formatting when pulling from SQS. I can only send a string to SQS as a message body, but I would need JSON. Currently, all messages in the body field are just string representations of JSON messages.
The data I send, serialized as JSON string, as SQS can't accept JSON in messageBody
{
"username": "name",
"property": "test prop",
"created_utc_date": "2024-04-18T08:00:00Z"
}
when pulling from SQS value is filled with a lot of fields but we can use SMT to extract just "Body" where my message is. The extracted message is just a JSON string
"{\"username\":\"name\",\"property\":\"test prop\",\"created_utc_date\":\"2024-04-18T08:00:00Z\"}"
and it can not be saved as I get an error.
"The connector has failed to register a new schema with Schema Registry because it is incompatible with an existing schema for the same subject."
I looked at a lot of possible solutions, SMT, converters, but I didn't manage to get anything to work, is there a way I can somehow deserialize that data at some point so it can be ingested by the connector?
The only thing I found was this https://www.confluent.io/hub/redhatinsights/expandjsonsmt/ but that plugin cannot be added to Confluent Cloud.
Upvotes: 1
Views: 325
Reputation: 11
So I have not managed to do this. However, I did something similar. Since I didn't find a way to get the message I wanted in the correct format to the correct topic in Kafka using the connector, I got rid of it and exchanged it for the AWS Lambda function. AWS Lambda is now triggered by SQS. Using the lambda I can format, serialize and change my messages as much as I was before sending them to Kafka.
If anyone else will want to try and use this kind of solution here is my Lambda code
const { Kafka } = require('kafkajs');
const { SchemaRegistry } = require('@kafkajs/confluent-schema-registry');
exports.handler = async function (event, context) {
const eventRecords = event.Records;
await Promise.all(eventRecords.map(async (record) => {
const message = JSON.parse(record.body);
const topic = message.topic;
await publishToKafka(message, topic);
}));
}
publishToKafka = async function (message, kafkaTopic) {
const kafkaBrokers = [process.env.KAFKA_DEV_CLUSTER];
const schemaRegistryUrl = process.env.SCHEMA_REGISTRY_URL;
const registry = new SchemaRegistry({
host: schemaRegistryUrl,
auth: {
username: process.env.SCHEMA_REGISTRY_DEV_API_KEY,
password: process.env.SCHEMA_REGISTRY_DEV_API_SECRET,
},
})
// Just the way we name schemas
const schemaName = kafkaTopic + '-value';
const ssl = true;
const kafka = new Kafka({
brokers: kafkaBrokers,
ssl,
sasl: {
mechanism: 'plain',
username: process.env.KAFKA_DEV_API_KEY,
password: process.env.KAFKA_DEV_API_SECRET,
},
securityProtocol: 'SASL_SSL'
});
const producer = kafka.producer();
await producer.connect();
try {
const id = await registry.getLatestSchemaId(schemaName);
const encodedPayload = await registry.encode(id, message)
await producer.send({
topic: kafkaTopic,
messages: [{ value: encodedPayload }]
});
console.log('Message sent to Kafka');
return 'Message sent to Kafka';
} catch (error) {
console.error('Error sending message to Kafka:', error);
return 'Error sending message to Kafka';
} finally {
await producer.disconnect();
}
};
So in the end my workflow is pretty simple, send messages to SQS and than lambda function process those messages and send them into kafka.
Upvotes: 0