Sophie
Sophie

Reputation: 139

How to deploy the kafka connect in a distributed mode?

I am working on building the Kafka-connect application using JDBC sink connector in kubernetes. I tried the Standalone mode and it is working. I would like to move to the distributed mode. I can successfully build up two pods (kafka connector) by running yaml file below:

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  namespace: vtq
  name: kafka-sink-postgres-dis
spec:
  replicas: 2
  template:
    metadata:
      labels:
        app: kafka-sink-postgres-dis
    spec:
      containers:
      - name: kafka-sink-postgres-dis
        image: ***
        imagePullPolicy: Always

bin/connect-distributed.sh config/worker.properties

bootstrap.servers=***:9092
offset.flush.interval.ms=10000

rest.port=8083
rest.host.name=127.0.0.1


key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081


# Prevent the connector from pulling all historical messages
auto.offset.reset=latest

# options below may be required for distributed mode

# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-postgres-sink-dis

# Topic to use for storing offsets. This topic should have many partitions and be replicated.
offset.storage.topic=postgres-connect-offsets
offset.storage.replication.factor=3

# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated topic.
# You MUST manually create the topic to ensure single partition for the config topic as auto created topics may have multiple partitions.

config.storage.topic=postgres-connect-configs
config.storage.replication.factor=3

# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated.
status.storage.topic=postgres-connect-status
status.storage.replication.factor=3

And created one sink connector inside of each pod with task.max=1 and the two connectors listened to the same topic. It turned out to be that they just duplicated.

curl -X POST -H "Content-Type: application/json" --data '{"name": "postgres_sink_dis", "config": {"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max":"1", "connection.url":"***","topics":"***"}}' http://127.0.0.1:8083/connectors

But I am quite confusing by the concepts of kafka connect cluster, the workers, the connectors, the tasks. I read from https://github.com/enfuse/kafka-connect-demo/blob/master/docs/install-connector.md. They Set up port-forwarding to the rest port before configuring the connector. I tried with that and after deploying the service and creating the connector, there is no return for curl -s 172.0.0.1:8083/connectors.

Can anyone give me a short description what should I do next and any relevant information would be very helpful. Thanks!

UPDATES: Finally, I figured out the issue and solved the problem. 1. Deploy two pods/workers separately with the same group.id and different rest.port (https://docs.confluent.io/current/connect/userguide.html). 2. Inside the pod, create a connector with tasks.

Upvotes: 4

Views: 5354

Answers (1)

Dipperman
Dipperman

Reputation: 139

For example you have a connector cluster of two workers/two pods. You can create connectors ( sink or source ) with several tasks in the cluster and these tasks will be distributed in the two workers.

Upvotes: 1

Related Questions