Paul
Paul

Reputation: 906

Kafka Connect - Sink connector - set use schema id in connector config

I've been trying to find available connector configurations with Kafka Connect that will allow me produce payloads to the kafka topic without having to specify the payload schema construct or having to add the schema id. In other word, so far I've only been able to get a JDBC sink connector (kafka -> postgres) working from for the following 2 schenarios:

  1. Json payloads need to be formatted together with a schema definition (without schema registry) -> blog post
from datetime import datetime
from confluent_kafka import Producer
import json
payload = {
    "schema": {
        "type": "struct", 
        "fields": [
            {
                "type": "string", 
                "field": "value"
            }, 
            {
                "type": "int64", 
                "field": "value_number"
            }, 
            {
                "type": "string", 
                "field": "timestamp"    
            }
        ],
        "optional": False,
        "name": "postgres-sink"
    },
    "payload": {
        "value": "example",
        "value_number": 5,
        "timestamp": str(datetime.utcnow())
    }
}
send = json.dumps(payload).encode('utf-8')
producer_conf = {
    #... connection details
}
producer = Producer(producer_conf)
producer.produce(topic='topic',value=send)
producer.flush()
  1. Produced payloads requiring the schema ID (schema loaded onto schema registry) in the msg for the connector converter to be able to know how to deserialize the packet:
import struct
from io import BytesIO
import json
from confluent_kafka import Producer

class _ContextStringIO(BytesIO):
    def __enter__(self):
        return self

    def __exit__(self, *args):
        self.close()
        return False

def serialize(content: dict, schema_id: int):
    with _ContextStringIO() as fp:
        fp.write(struct.pack('>bI', 0, schema_id))
        fp.write(json.dumps(content).encode('utf-8'))
        return fp.getvalue()

payload = serialize({
  "value": "example-ser",
  "value_number": i,
  "timestamp": str(datetime.utcnow())
  }, 2)
producer_conf = {
    #... connection details
}
producer = Producer(producer_conf)
producer.produce(topic='topic',value=payload)
producer.flush()

Even though the second approach is almost there, I require to handle kafka packets serialized as:

payload = json.dumps({
    "value": "example-2",
    "value_number": 5,
    "timestamp": str(datetime.utcnow())
}).encode('utf-8')

therefore, requiring (hoping) to be able to configure the connector in kafka connect with the schema id from the schema registry so that it's able to deserialize and format the payload for postgres inserts. The config to my jdbc connector is (establishing via api):

name = '<connector name>'
config = {
    # ... jdbc connection details and config
    "connector.class": 'io.confluent.connect.jdbc.JdbcSinkConnector',   # use JDBC connector as sink
    # etc...

    # ... reporter config 

    # converters for kafka to the destination
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",   
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schema.registry.url": "<host>",
    "value.converter.ignore.default.for.nullables": "true",         # ignore null values
    "value.converter.schemas.enable": "false",                      # true | false -> include schema in the message
    "value.converter.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicNameStrategy", # subject name strategy
    "value.converter.use.schema.id": "2",

    # ...dead letter queue specifications
}

res = requests.put(f'http://localhost:8083/connectors/{name}/config', json=config)
print(f"returned status code: {res.status_code} (reason: {res.reason})")
res.raise_for_status()

which I can see reflects in the connector initialization config respectively (if interpreting correctly that this is what I need to follow):

deserialization

However, sending payloads without schema ids, etc. still shows that the connector can't deserialize the payload, with the stack trace error of unknown magic byte being produced:

deserializing error

Can someone please identify if I'm setting the right connector config properties (specifying schema by id or subject name) so that I'm able to only produce json payloads without including schema specification... or if I'm not able to do what I want to do?

I can confirm that my schema is available on the schema registry for that id

Upvotes: 1

Views: 662

Answers (1)

Paul
Paul

Reputation: 906

Joh, leaps and bounds of feedback from this question. lol, anyway. Found a solution while getting a few leads from the slack community. The 'best' solution I could get thus far is by using a custom kafka connect plugin jcustenborder/kafka-connect-json-schema. Details:

In a nutshell, the plugin allows for kafka packets to be published without any reference to a json schema or schema id (are required by the wire format in formal kafka connect documentation). Schema info on kafka topic packets can be added from connector config, via (1) inline schema string (2) file location with schema definition in or (3) url (utilizing schema registry for management and schema evolution - calling the json schema by rest endpoint).

For my purpose, I just defined the whole json schema infline from config, eg.:

import json

schema = {
  "$id": "https://example.com/person.schema.json",
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "User",
  "type": "object",
  "properties": {
    "value": {
      "type": "string",
      "description": "Random string value to set"
    },
    "value_number": {
      "type": "integer",
      "description": "Integer value to be set"
    },
    "timestamp": {
      "type": "string",
      "description": "String representation of timestamp"
    }
  }
}

config = {
 'name': 'small',
 'tasks.max': 1,
 'topics': 'bulk.small',
 'batch.size': 500,
 'connector.class': 'io.confluent.connect.jdbc.JdbcSinkConnector',
 'connection.url': 'jdbc:postgresql://xxx',
 'connection.user': 'xxx',
 'connection.password': 'xxx',
 'table.name.format': 'kafka_connect.small',
 'insert.mode': 'INSERT',
 'auto.create': 'true',
 'auto-evolve': 'false',
 'pk.mode': 'none',
 'key.converter': 'org.apache.kafka.connect.storage.StringConverter',
  'value.converter': 'org.apache.kafka.connect.converters.ByteArrayConverter',  
  "transforms" : "fromJson",
  "transforms.fromJson.type" : "com.github.jcustenborder.kafka.connect.json.FromJson$Value",
  "transforms.fromJson.json.schema.location" : "Inline",
  "transforms.fromJson.json.schema.inline" : json.dumps(schema),
 'errors.tolerance': 'all',
 'errors.log.enable': 'true',
 'errors.log.include.messages': 'true',
 'errors.deadletterqueue.topic.name': 'bulk.small-dlq',
 'errors.deadletterqueue.topic.replication.factor': '1'
}

# rest call to create connector from json config body
  • Last step in testing the created connector by sending packets through a kafka topic:
from confluent_kafka.admin import AdminClient, TopicMetadata, NewTopic, PartitionMetadata
from confluent_kafka import KafkaError, Producer, Consumer, TopicPartition
import json
from datetime import datetime

producer = Producer({
    'bootstrap.servers': "xxx",     
    'sasl.username': "xxx",
    'sasl.password': "xxx",
    'sasl.mechanism': 'PLAIN',
    'security.protocol': 'SASL_PLAINTEXT'
})

def send_databytes(topic: str, data: list[bytes] | bytes) -> str:      
        try:
            if producer == None: 
                raise Exception("Not instantiated")
            count = 0
            
            for record in data:
                producer.produce(topic=topic, value=record)
                count += 1
                
            print(f"produced data count: [{count}]")        
            producer.flush()
            return f"data produced to topic [{count} items]"
        except Exception as e:
            raise Exception(f"fail during transmit: {e}")

val += 1
packet = {
    'value': f'test_{val}',
    'value_number': val,
    'timestamp': str(datetime.utcnow())
}
bins = json.dumps(packet).encode('utf-8')
send_databytes('bulk.small', [bins])

Upvotes: 0

Related Questions