Savva Sergey
Savva Sergey

Reputation: 403

Stream Nested JSON from Snowflake to Kafka Using JDBC Source Connector

I am trying to set up a data pipeline where SQL query results from Snowflake (structured as nested JSON using the OBJECT_CONSTRUCT function) are streamed to a Kafka topic. I want to achieve this using configuration only, without custom coding.

I managed to set up a JDBC source connector that sends messages to a Kafka topic executing the following query:

select 
    object_construct(
        'id', id, 
        'nestedField', object_construct(
            'nestedvalue', value
        )
    ) as myRecord
from myTable;

With "value.converter": "org.apache.kafka.connect.json.JsonConverter" setting the Kafka message looks like this

{ "MYRECORD": "{\"id\":1,\"nestedField\":{\"nestedValue\":\"value\"}}"}

and after adding to the connector config a field tansformer

    "transforms": "extractField",
    "transforms.extractField.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
    "transforms.extractField.field": "MYRECORD",

the message is just this

"{\"id\":1,\"nestedField\":{\"nestedValue\":\"value\"}}"

which is not what I exactly want. My goal is to have just a pure JSON value like this

{"id":1,"nestedField":{"nestedValue":"value"}}

The question is how to configure the source connector to achieve it? Again, ideally I don't want to write any custom code like implementing own SMT. I'd also like to avoid using ksqlDB.

It's probably worth to ask separately, but it's still related to my question. If I encode my original value using base64 is there any simple way to decode it back (using an SMT) before posting message to the topic?

Upvotes: 0

Views: 136

Answers (1)

Savva Sergey
Savva Sergey

Reputation: 403

It was easier than it looked.

Simply changing the value converter to org.apache.kafka.connect.storage.StringConverter did the trick and the same extractField transformed helped to get the value from the struct.

So after all the config for the JDBC source connector looks like this

{
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:snowflake://<MY_SNOWFLAKE_ACCOUNT>.snowflakecomputing.com/?warehouse=<MY_WAREHOUSE>&db=<MY_DB>&role=<MY_ROLE>&schema=<MY_SCHEMA>&private_key_file=<MY_KEY_PATH>&private_key_file_pwd=<MY_KEY_PASS>",
    "query": "SELECT OBJECT_CONSTRUCT( 'id', id,  'nestedField', object_construct( 'nestedValue', value ) ) AS RECORD FROM my_db.my_schema.my_table;",
    "mode": "bulk",
    "tasks.max": "1",
    "topic.prefix": "jdbc_source_snowflake",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter.schemas.enable": "false",
    "transforms": "extractField",
    "transforms.extractField.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
    "transforms.extractField.field": "RECORD"
}

Upvotes: 1

Related Questions