Reputation: 181
Cannot create a Kafka -> Cassandra Sink connector using Ksqldb :
CREATE SINK CONNECTOR cassandra WITH( "connector.class" = 'io.confluent.connect.cassandra.CassandraSinkConnector', "tasks.max" = '1', "topics" = 'tst', "cassandra.contact.points" = 'cassandra', "cassandra.keyspace" = 'test', "cassandra.write.mode" = 'Update', "confluent.topic.bootstrap.servers" = 'kafka:9092' );
ERROR [CASS|worker] WorkerConnector{id=CASS} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector:118)
org.apache.kafka.connect.errors.ConnectException: Error while attempting to create/find topic(s) '_confluent-command'
at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:262)
at io.confluent.license.LicenseStore$1.run(LicenseStore.java:161)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:128)
at io.confluent.license.LicenseStore.start(LicenseStore.java:190)
at io.confluent.license.LicenseManager.<init>(LicenseManager.java:155)
at io.confluent.license.LicenseManager.<init>(LicenseManager.java:140)
at io.confluent.connect.utils.licensing.ConnectLicenseManager$Builder.lambda$build$0(ConnectLicenseManager.java:210)
at io.confluent.connect.utils.licensing.ConnectLicenseManager.registerOrValidateLicense(ConnectLicenseManager.java:255)
at io.confluent.connect.cassandra.CassandraSinkConnector.doStart(CassandraSinkConnector.java:50)
at io.confluent.connect.cassandra.CassandraSinkConnector.start(CassandraSinkConnector.java:45)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:110)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:135)
at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:195)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:257)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1190)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:126)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.call(DistributedHerder.java:1206)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.call(DistributedHerder.java:1202)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:229)
... 21 more
Caused by: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.
Upvotes: 1
Views: 813
Reputation: 181
The Confluent Cassandra sink connector Replication factor had a default value = 3. Modifying the default value in the connector config solved the problem!
"confluent.topic.replication.factor" = '1',
Upvotes: 1