Reputation: 2243
I have setup the snowflake - kafka connector. I setup a sample table (kafka_connector_test) in snowflake with 2 fields both are VARCHAR type. Fields are CUSTOMER_ID and PURCHASE_ID.
Here is my configuration that I created for the connector
curl -X POST \
-H "Content-Type: application/json" \
--data '{
"name":"kafka_connector_test",
"config":{
"connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max":"2",
"topics":"kafka-connector-test",
"snowflake.topic2table.map": "kafka-connector-test:kafka_connector_test",
"buffer.count.records":"10000",
"buffer.flush.time":"60",
"buffer.size.bytes":"5000000",
"snowflake.url.name":"XXXXXXXX.snowflakecomputing.com:443",
"snowflake.user.name":"XXXXXXXX",
"snowflake.private.key":"XXXXXXXX",
"snowflake.database.name":"XXXXXXXX",
"snowflake.schema.name":"XXXXXXXX",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter"}}'\
I send data to the topic that I have configured in the connector configuration.
{"CUSTOMER_ID" : "test_id", "PURCHASE_ID" : "purchase_id_test"}
then when I check the kafka-connect server I get the below error:
[SF KAFKA CONNECTOR] Detail: Table doesn't have a compatible schema
Is there something I need to setup in either kafka connect or snowflake that says which parts of the json go into which columns of the table? Not sure how to specify how it parses the json.
I setup a different topic as well and didn't create a table in snowlake. In that I was able to populate this table but the connector makes a table with 2 columns RECORD_METADATA and RECORD_CONTENT. But I don't want to write a scheduled job to parse this I want to directly insert into a queryable table.
Upvotes: 0
Views: 1623
Reputation: 2243
Also looks like it's not supported at this time in reference to this github issue
Upvotes: 0
Reputation: 3465
Snowflake Kafka connector writes data as json by design. The default columns RECORD_METADATA
and RECORD_CONTENT
are variant. If you like to query them you can create a view on top the table to achieve your goal and you don't need a scheduled job
So, your table created by connector would be something like
RECORD_METADATA, RECORD_CONTENT
{metadata fields in json}, {"CUSTOMER_ID" : "test_id", "PURCHASE_ID" : "purchase_id_test"}
You can create a view to display your data
create view v1 as
select RECORD_CONTENT:CUSTOMER_ID::text CUSTOMER_ID,
RECORD_CONTENT:PURCHASE_ID::text PURCHASE_ID
Your query will be
select CUSTOMER_ID , PURCHASE_ID from v1
PS. If you like to create your own tables you need to use variant
as your data type instead of varchar
Upvotes: 3