alex
alex

Reputation: 2243

SF KAFKA CONNECTOR Detail: Table doesn't have a compatible schema - snowflake kafka connector

I have setup the snowflake - kafka connector. I setup a sample table (kafka_connector_test) in snowflake with 2 fields both are VARCHAR type. Fields are CUSTOMER_ID and PURCHASE_ID.

Here is my configuration that I created for the connector

curl -X POST \
  -H "Content-Type: application/json" \
  --data '{
"name":"kafka_connector_test",
"config":{
"connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max":"2",
"topics":"kafka-connector-test",
"snowflake.topic2table.map": "kafka-connector-test:kafka_connector_test",
"buffer.count.records":"10000",
"buffer.flush.time":"60",
"buffer.size.bytes":"5000000",
"snowflake.url.name":"XXXXXXXX.snowflakecomputing.com:443",
"snowflake.user.name":"XXXXXXXX",
"snowflake.private.key":"XXXXXXXX",
"snowflake.database.name":"XXXXXXXX",
"snowflake.schema.name":"XXXXXXXX",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter"}}'\

I send data to the topic that I have configured in the connector configuration.

{"CUSTOMER_ID" : "test_id", "PURCHASE_ID" : "purchase_id_test"}

then when I check the kafka-connect server I get the below error:

[SF KAFKA CONNECTOR] Detail: Table doesn't have a compatible schema

Is there something I need to setup in either kafka connect or snowflake that says which parts of the json go into which columns of the table? Not sure how to specify how it parses the json.

I setup a different topic as well and didn't create a table in snowlake. In that I was able to populate this table but the connector makes a table with 2 columns RECORD_METADATA and RECORD_CONTENT. But I don't want to write a scheduled job to parse this I want to directly insert into a queryable table.

Upvotes: 0

Views: 1623

Answers (2)

alex
alex

Reputation: 2243

Also looks like it's not supported at this time in reference to this github issue

Upvotes: 0

demircioglu
demircioglu

Reputation: 3465

Snowflake Kafka connector writes data as json by design. The default columns RECORD_METADATA and RECORD_CONTENT are variant. If you like to query them you can create a view on top the table to achieve your goal and you don't need a scheduled job

So, your table created by connector would be something like

RECORD_METADATA, RECORD_CONTENT

{metadata fields in json}, {"CUSTOMER_ID" : "test_id", "PURCHASE_ID" : "purchase_id_test"}

You can create a view to display your data

create view v1 as 
select RECORD_CONTENT:CUSTOMER_ID::text CUSTOMER_ID,
RECORD_CONTENT:PURCHASE_ID::text PURCHASE_ID

Your query will be

select CUSTOMER_ID , PURCHASE_ID from v1

PS. If you like to create your own tables you need to use variant as your data type instead of varchar

Upvotes: 3

Related Questions