Reputation: 21
I have a cluster running in Confluent Cloud and am able to Produce and Consume data using other applications. However, when I try to hook up the Snowflake Kafka Connector I receive these errors:
[2019-10-15 22:12:08,979] INFO Creating connector source-snowflake of type com.snowflake.kafka.connector.SnowflakeSinkConnector (org.apache.kafka.connect.runtime.Worker)
[2019-10-15 22:12:08,983] INFO Instantiated connector source-snowflake with version 0.5.1 of type class com.snowflake.kafka.connector.SnowflakeSinkConnector (org.apache.kafka.connect.runtime.Worker)
[2019-10-15 22:12:08,986] INFO
[SF_KAFKA_CONNECTOR] Snowflake Kafka Connector Version: 0.5.1 (com.snowflake.kafka.connector.Utils)
[2019-10-15 22:12:09,029] INFO
[SF_KAFKA_CONNECTOR] SnowflakeSinkConnector:start (com.snowflake.kafka.connector.SnowflakeSinkConnector)
[2019-10-15 22:12:09,030] ERROR
[SF_KAFKA_CONNECTOR] name is empty or invalid. It should match Snowflake object identifier syntax. Please see the documentation. (com.snowflake.kafka.connector.Utils)
[2019-10-15 22:12:09,033] ERROR WorkerConnector{id=source-snowflake} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector)
com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException:
[SF_KAFKA_CONNECTOR] Exception: Invalid input connector configuration
[SF_KAFKA_CONNECTOR] Error Code: 0001
[SF_KAFKA_CONNECTOR] Detail: input kafka connector configuration is null, missing required values, or wrong input value
at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:347)
at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:306)
at com.snowflake.kafka.connector.Utils.validateConfig(Utils.java:400)
at com.snowflake.kafka.connector.SnowflakeSinkConnector.start(SnowflakeSinkConnector.java:131)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:252)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1079)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:117)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1095)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1091)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Here is my scrubbed Snowflake config file:
{
"name":"snowsink",
"config":{
"connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max":"8",
"topics":"tp-snow-test",
"buffer.count.records":"100",
"buffer.flush.time":"60",
"buffer.size.bytes":"65536",
"snowflake.url.name":"xxxxxxx.east-us-2.azure.snowflakecomputing.com",
"snowflake.user.name":"svc_cc_strm",
"snowflake.private.key":"<key>",
"snowflake.private.key.passphrase":<password>,
"snowflake.database.name":"testdb",
"snowflake.schema.name":"test1",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
}
}
Any ideas? Thanks.
Upvotes: 2
Views: 1536
Reputation: 3887
The name of the connector should be a valid SQL identifier to Snowflake. So many of the kafka topic examples have dashes in them that when I first tried the Snowflake Kafka connector I got this same error.
According to the documentation, a Snowflake pipe is created using the connector_name specified, and pipe names must be valid SQL identifiers.
The connector creates one pipe for each topic partition. The name is:
SNOWFLAKE_KAFKA_CONNECTOR_PIPE_.
Also from the same doc page at "Fields in the Configuration File" for name:
Application name. This must be unique across all Kafka connectors used by the customer. This name name must be a valid Snowflake unquoted identifier.
If the topic has a dash in it then it will need to mapped to a table name that is also a proper SQL identifier in your connector config, otherwise it will try to create the table name as the same as the topic name and fail on the "-" in the name.
Upvotes: 1
Reputation: 923
You need to have below entry in your config file , below topics entry.
"topics":"tp-snow-test",
"snowflake.topic2table.map": "tp-snow-test:TestKafkaTable",
Upvotes: 0
Reputation: 32110
You need to change the name of your connector (source-snow
) to remove the -
from it (so that it matches this validation pattern).
🤷♂️
Upvotes: 0