
Reputation: 11

How to serialize JSON string into JSON object for schema registry validation in confluent cloud SQS connector?

What I am trying to do:

I have a schema registry and I would like my messages to be checked against it before being saved. It can work with the output record value format being set to JSON_SR

My problem is message formatting when pulling from SQS. I can only send a string to SQS as a message body, but I would need JSON. Currently, all messages in the body field are just string representations of JSON messages.

The data I send, serialized as JSON string, as SQS can't accept JSON in messageBody

      "username": "name",
      "property": "test prop",
      "created_utc_date": "2024-04-18T08:00:00Z"

when pulling from SQS value is filled with a lot of fields but we can use SMT to extract just "Body" where my message is. The extracted message is just a JSON string

"{\"username\":\"name\",\"property\":\"test prop\",\"created_utc_date\":\"2024-04-18T08:00:00Z\"}" and it can not be saved as I get an error.

"The connector has failed to register a new schema with Schema Registry because it is incompatible with an existing schema for the same subject."

I looked at a lot of possible solutions, SMT, converters, but I didn't manage to get anything to work, is there a way I can somehow deserialize that data at some point so it can be ingested by the connector?

The only thing I found was this https://www.confluent.io/hub/redhatinsights/expandjsonsmt/ but that plugin cannot be added to Confluent Cloud.

Upvotes: 1

Views: 325

Answers (1)


Reputation: 11

So I have not managed to do this. However, I did something similar. Since I didn't find a way to get the message I wanted in the correct format to the correct topic in Kafka using the connector, I got rid of it and exchanged it for the AWS Lambda function. AWS Lambda is now triggered by SQS. Using the lambda I can format, serialize and change my messages as much as I was before sending them to Kafka.

If anyone else will want to try and use this kind of solution here is my Lambda code

const { Kafka } = require('kafkajs');
const { SchemaRegistry } = require('@kafkajs/confluent-schema-registry');

exports.handler = async function (event, context) {
    const eventRecords = event.Records;
    await Promise.all(eventRecords.map(async (record) => {
        const message = JSON.parse(record.body);
        const topic = message.topic;
        await publishToKafka(message, topic);

publishToKafka = async function (message, kafkaTopic) {
    const kafkaBrokers = [process.env.KAFKA_DEV_CLUSTER];
    const schemaRegistryUrl = process.env.SCHEMA_REGISTRY_URL;

    const registry = new SchemaRegistry({
        host: schemaRegistryUrl,
        auth: {
          username: process.env.SCHEMA_REGISTRY_DEV_API_KEY,
          password: process.env.SCHEMA_REGISTRY_DEV_API_SECRET,
    // Just the way we name schemas
    const schemaName = kafkaTopic + '-value';
    const ssl = true;

    const kafka = new Kafka({
        brokers: kafkaBrokers,
        sasl: {
            mechanism: 'plain',
            username: process.env.KAFKA_DEV_API_KEY,
            password: process.env.KAFKA_DEV_API_SECRET,
        securityProtocol: 'SASL_SSL'

    const producer = kafka.producer();
    await producer.connect();

    try {
        const id = await registry.getLatestSchemaId(schemaName);
        const encodedPayload = await registry.encode(id, message)

        await producer.send({
            topic: kafkaTopic,
            messages: [{ value: encodedPayload }]

        console.log('Message sent to Kafka');
        return 'Message sent to Kafka';
    } catch (error) {
        console.error('Error sending message to Kafka:', error);
        return 'Error sending message to Kafka';
    } finally {
        await producer.disconnect();

So in the end my workflow is pretty simple, send messages to SQS and than lambda function process those messages and send them into kafka.

Upvotes: 0

Related Questions