Reputation: 403
I am trying to set up a data pipeline where SQL query results from Snowflake (structured as nested JSON using the OBJECT_CONSTRUCT
function) are streamed to a Kafka topic. I want to achieve this using configuration only, without custom coding.
I managed to set up a JDBC source connector that sends messages to a Kafka topic executing the following query:
select
object_construct(
'id', id,
'nestedField', object_construct(
'nestedvalue', value
)
) as myRecord
from myTable;
With "value.converter": "org.apache.kafka.connect.json.JsonConverter"
setting the Kafka message looks like this
{ "MYRECORD": "{\"id\":1,\"nestedField\":{\"nestedValue\":\"value\"}}"}
and after adding to the connector config a field tansformer
"transforms": "extractField",
"transforms.extractField.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractField.field": "MYRECORD",
the message is just this
"{\"id\":1,\"nestedField\":{\"nestedValue\":\"value\"}}"
which is not what I exactly want. My goal is to have just a pure JSON value like this
{"id":1,"nestedField":{"nestedValue":"value"}}
The question is how to configure the source connector to achieve it? Again, ideally I don't want to write any custom code like implementing own SMT. I'd also like to avoid using ksqlDB.
It's probably worth to ask separately, but it's still related to my question. If I encode my original value using base64 is there any simple way to decode it back (using an SMT) before posting message to the topic?
Upvotes: 0
Views: 136
Reputation: 403
It was easier than it looked.
Simply changing the value converter to org.apache.kafka.connect.storage.StringConverter
did the trick and the same extractField
transformed helped to get the value from the struct.
So after all the config for the JDBC source connector looks like this
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:snowflake://<MY_SNOWFLAKE_ACCOUNT>.snowflakecomputing.com/?warehouse=<MY_WAREHOUSE>&db=<MY_DB>&role=<MY_ROLE>&schema=<MY_SCHEMA>&private_key_file=<MY_KEY_PATH>&private_key_file_pwd=<MY_KEY_PASS>",
"query": "SELECT OBJECT_CONSTRUCT( 'id', id, 'nestedField', object_construct( 'nestedValue', value ) ) AS RECORD FROM my_db.my_schema.my_table;",
"mode": "bulk",
"tasks.max": "1",
"topic.prefix": "jdbc_source_snowflake",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": "false",
"transforms": "extractField",
"transforms.extractField.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractField.field": "RECORD"
}
Upvotes: 1