Reputation: 21
We are trying to connect to Snowflake database using Kafka. Based on support doc below we have created the config file. We are using JSON format as payload input to snowflake using kafka.
Support doc: https://docs.snowflake.com/en/user-guide/kafka-connector-install.html#configuring-the-kafka-connector
Config file:
{
"name":"FactoryData",
"Config":{
"connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max":"8",
"topics":"csv_test1",
"snowflake.topic2table.map": "csv_test1:FactoryData",
"buffer.count.records":"10000",
"buffer.flush.time":"60",
"buffer.size.bytes":"5000000",
"snowflake.url.name":"xxxx.snowflakecomputing.com:443",
"snowflake.user.name":"kafka_connector",
"snowflake.private.key":"XXXX",
"snowflake.database.name":"dwa_dev",
"snowflake.schema.name":"kafka",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter",
"snowflake.metadata.topic":"FALSE",
"snowflake.metadata.createtime":"FALSE",
"snowflake.metadata.offset.and.partition":"FALSE"
}
}
When using below command as per doc (distributed mode), we are getting error.
[kafka@aws13kafka01 ~]$ curl -X POST -H "Content-Type: application/json" --data @/data/app/kafka/apachekafka/CSV/config_file.json http://localhost:8083/connectors
{"error_code":500,"message":"Unrecognized field "Config" (class org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest), not marked as ignorable (2 known properties: "config", "name"])\n at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 1216] (through reference chain: org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest["Config"])"}
Can you please help on it.
Upvotes: 1
Views: 1808
Reputation:
(2 known properties: "config", "name" …)
The current documentation's distributed configuration example appears to carry a typo.
The "Config"
key must be lower-case: "config"
.
Retry your HTTP POST call with the following instead to get the service to recognize the payload:
{
"name":"FactoryData",
"config":{
"connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max":"8",
"topics":"csv_test1",
"snowflake.topic2table.map": "csv_test1:FactoryData",
"buffer.count.records":"10000",
"buffer.flush.time":"60",
"buffer.size.bytes":"5000000",
"snowflake.url.name":"xxxx.snowflakecomputing.com:443",
"snowflake.user.name":"kafka_connector",
"snowflake.private.key":"XXXX",
"snowflake.database.name":"dwa_dev",
"snowflake.schema.name":"kafka",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter",
"snowflake.metadata.topic":"FALSE",
"snowflake.metadata.createtime":"FALSE",
"snowflake.metadata.offset.and.partition":"FALSE"
}
}
Upvotes: 2