Malina Dale
Malina Dale

Reputation: 173

Access kafka cluster topic in confluent cloud

I'm trying to connect iceberg-kafka-connect connector to the kafka topic present in my confluent cloud. I'm using MSK connect to run this connector.

(I was able to access the confluent kafka topic from an AWS ec2 instance. So this assures my AWS VPC can access the kafka cluster in confluent cloud.) I'm creating this connector in MSK connecto with same VPC, subnet and attaching same security group as of ec2 instance but I'm unable to get the data. I think the issue lies in providing sasl.jaas.config in the accurate format, or maybe some timeout issue?

These are the configurations that I'm using in my MSK connect connector's configuration (first 3 are for most likely used for auth):

iceberg.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<CONFLUENT_API_KEY>" password="<CONFLUENT_API_SECRET>";
iceberg.kafka.security.protocol=SASL_SSL
iceberg.kafka.sasl.mechanism=PLAIN

connector.class=io.tabular.iceberg.connect.IcebergSinkConnector
table.write-format=parquet
iceberg.tables.evolve-schema-enabled=true
iceberg.fs.s3a.path.style.access=true
table.namespace=pocckafkaflink2
iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
tasks.max=1
topics=confluent_test_topic2
iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
iceberg.catalog.client.region=us-east-1
iceberg.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
iceberg.tables=pocckafkaflink2.mskmytopicmaib
value.converter.schemas.enable=false
iceberg.catalog.warehouse=s3://bucket/data4
value.converter=org.apache.kafka.connect.json.JsonConverter
table.auto-create=true 

Error Logs:

[Worker-06cf2e74aeb2c59e6] org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, deadlineMs=1722837422758, tries=1, nextAllowedTryMs=1722837422859) timed out at 1722837422759 after 1 attempt(s)
[Worker-06cf2e74aeb2c59e6] Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call. Call: fetchMetadata
[Worker-06cf2e74aeb2c59e6] [2024-08-05 05:57:32,759] INFO App info kafka.admin.client for adminclient-1 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
[Worker-06cf2e74aeb2c59e6] [2024-08-05 05:57:32,759] INFO [AdminClient clientId=adminclient-1] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235)
[Worker-06cf2e74aeb2c59e6] org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, deadlineMs=1722837452759, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
[Worker-06cf2e74aeb2c59e6] Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call. Call: fetchMetadata
[Worker-06cf2e74aeb2c59e6] [2024-08-05 05:57:32,766] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:668)
[Worker-06cf2e74aeb2c59e6] [2024-08-05 05:57:32,766] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:672)
[Worker-06cf2e74aeb2c59e6] [2024-08-05 05:57:32,766] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:678)
[Worker-06cf2e74aeb2c59e6] [2024-08-05 05:57:32,767] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed:86)
[Worker-06cf2e74aeb2c59e6] org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties.
[Worker-06cf2e74aeb2c59e6]  at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70)
[Worker-06cf2e74aeb2c59e6]  at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51)
[Worker-06cf2e74aeb2c59e6]  at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:97)
[Worker-06cf2e74aeb2c59e6]  at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:80)
[Worker-06cf2e74aeb2c59e6] Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1722837452757, tries=1, nextAllowedTryMs=1722837452858) timed out at 1722837452758 after 1 attempt(s)
[Worker-06cf2e74aeb2c59e6]  at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
[Worker-06cf2e74aeb2c59e6]  at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
[Worker-06cf2e74aeb2c59e6]  at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
[Worker-06cf2e74aeb2c59e6]  at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
[Worker-06cf2e74aeb2c59e6]  at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
[Worker-06cf2e74aeb2c59e6]  ... 3 more
[Worker-06cf2e74aeb2c59e6] Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1722837452757, tries=1, nextAllowedTryMs=1722837452858) timed out at 1722837452758 after 1 attempt(s)
[Worker-06cf2e74aeb2c59e6] Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes
[Worker-06cf2e74aeb2c59e6] MSK Connect encountered errors and failed.

Upvotes: 1

Views: 153

Answers (1)

EdbE
EdbE

Reputation: 391

MSK Connect supports following authentication methods with Kafka Clusters:

  • IAM (using roles and policies)
  • None (using unauthenticated access via plaintext or SSL)

Confluent Cloud Supports following authentications methods:

  • OAuth/OIDC
  • mutual TLS (mTLS)
  • single sign-on (SSO)

As a result, there is no possible way to integrate MSK Connect and Confluent Cloud.

Note, I never tried to work it around, so I would be happy to learn if there were any

Upvotes: 0

Related Questions