sdmgill
sdmgill

Reputation: 21

SF_KAFKA_CONNECTOR name is empty or invalid error using Confluent Cloud and Snowflake Kafka Connector

I have a cluster running in Confluent Cloud and am able to Produce and Consume data using other applications. However, when I try to hook up the Snowflake Kafka Connector I receive these errors:

[2019-10-15 22:12:08,979] INFO Creating connector source-snowflake of type com.snowflake.kafka.connector.SnowflakeSinkConnector (org.apache.kafka.connect.runtime.Worker)
[2019-10-15 22:12:08,983] INFO Instantiated connector source-snowflake with version 0.5.1 of type class com.snowflake.kafka.connector.SnowflakeSinkConnector (org.apache.kafka.connect.runtime.Worker)
[2019-10-15 22:12:08,986] INFO 
[SF_KAFKA_CONNECTOR] Snowflake Kafka Connector Version: 0.5.1 (com.snowflake.kafka.connector.Utils)
[2019-10-15 22:12:09,029] INFO 
[SF_KAFKA_CONNECTOR] SnowflakeSinkConnector:start (com.snowflake.kafka.connector.SnowflakeSinkConnector)
[2019-10-15 22:12:09,030] ERROR 
[SF_KAFKA_CONNECTOR] name is empty or invalid. It should match Snowflake object identifier syntax. Please see the documentation. (com.snowflake.kafka.connector.Utils)
[2019-10-15 22:12:09,033] ERROR WorkerConnector{id=source-snowflake} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector)
com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException: 
[SF_KAFKA_CONNECTOR] Exception: Invalid input connector configuration
[SF_KAFKA_CONNECTOR] Error Code: 0001
[SF_KAFKA_CONNECTOR] Detail: input kafka connector configuration is null, missing required values, or wrong input value
at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:347)
at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:306)
at com.snowflake.kafka.connector.Utils.validateConfig(Utils.java:400)
at com.snowflake.kafka.connector.SnowflakeSinkConnector.start(SnowflakeSinkConnector.java:131)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:252)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1079)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:117)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1095)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1091)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Here is my scrubbed Snowflake config file:

{
  "name":"snowsink",
  "config":{
    "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "tasks.max":"8",
    "topics":"tp-snow-test",
    "buffer.count.records":"100",
    "buffer.flush.time":"60",
    "buffer.size.bytes":"65536",
    "snowflake.url.name":"xxxxxxx.east-us-2.azure.snowflakecomputing.com",
    "snowflake.user.name":"svc_cc_strm",
    "snowflake.private.key":"<key>",
    "snowflake.private.key.passphrase":<password>,
    "snowflake.database.name":"testdb",
    "snowflake.schema.name":"test1",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
   }
}

Any ideas? Thanks.

Upvotes: 2

Views: 1536

Answers (3)

ScottMcG
ScottMcG

Reputation: 3887

The name of the connector should be a valid SQL identifier to Snowflake. So many of the kafka topic examples have dashes in them that when I first tried the Snowflake Kafka connector I got this same error.

According to the documentation, a Snowflake pipe is created using the connector_name specified, and pipe names must be valid SQL identifiers.

The connector creates one pipe for each topic partition. The name is:

SNOWFLAKE_KAFKA_CONNECTOR_PIPE_.

Also from the same doc page at "Fields in the Configuration File" for name:

Application name. This must be unique across all Kafka connectors used by the customer. This name name must be a valid Snowflake unquoted identifier.

If the topic has a dash in it then it will need to mapped to a table name that is also a proper SQL identifier in your connector config, otherwise it will try to create the table name as the same as the topic name and fail on the "-" in the name.

Upvotes: 1

Ankur Srivastava
Ankur Srivastava

Reputation: 923

You need to have below entry in your config file , below topics entry.

"topics":"tp-snow-test",

"snowflake.topic2table.map": "tp-snow-test:TestKafkaTable",

Upvotes: 0

Robin Moffatt
Robin Moffatt

Reputation: 32110

You need to change the name of your connector (source-snow) to remove the - from it (so that it matches this validation pattern).

🤷‍♂️

Upvotes: 0

Related Questions