Reputation: 139
I am working on building the Kafka-connect application using JDBC sink connector in kubernetes. I tried the Standalone mode and it is working. I would like to move to the distributed mode. I can successfully build up two pods (kafka connector) by running yaml file below:
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
namespace: vtq
name: kafka-sink-postgres-dis
spec:
replicas: 2
template:
metadata:
labels:
app: kafka-sink-postgres-dis
spec:
containers:
- name: kafka-sink-postgres-dis
image: ***
imagePullPolicy: Always
bin/connect-distributed.sh config/worker.properties
bootstrap.servers=***:9092
offset.flush.interval.ms=10000
rest.port=8083
rest.host.name=127.0.0.1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
# Prevent the connector from pulling all historical messages
auto.offset.reset=latest
# options below may be required for distributed mode
# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-postgres-sink-dis
# Topic to use for storing offsets. This topic should have many partitions and be replicated.
offset.storage.topic=postgres-connect-offsets
offset.storage.replication.factor=3
# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated topic.
# You MUST manually create the topic to ensure single partition for the config topic as auto created topics may have multiple partitions.
config.storage.topic=postgres-connect-configs
config.storage.replication.factor=3
# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated.
status.storage.topic=postgres-connect-status
status.storage.replication.factor=3
And created one sink connector inside of each pod with task.max=1 and the two connectors listened to the same topic. It turned out to be that they just duplicated.
curl -X POST -H "Content-Type: application/json" --data '{"name": "postgres_sink_dis", "config": {"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max":"1", "connection.url":"***","topics":"***"}}' http://127.0.0.1:8083/connectors
But I am quite confusing by the concepts of kafka connect cluster, the workers, the connectors, the tasks. I read from https://github.com/enfuse/kafka-connect-demo/blob/master/docs/install-connector.md. They Set up port-forwarding to the rest port before configuring the connector. I tried with that and after deploying the service and creating the connector, there is no return for curl -s 172.0.0.1:8083/connectors.
Can anyone give me a short description what should I do next and any relevant information would be very helpful. Thanks!
UPDATES: Finally, I figured out the issue and solved the problem. 1. Deploy two pods/workers separately with the same group.id and different rest.port (https://docs.confluent.io/current/connect/userguide.html). 2. Inside the pod, create a connector with tasks.
Upvotes: 4
Views: 5354
Reputation: 139
For example you have a connector cluster of two workers/two pods. You can create connectors ( sink or source ) with several tasks in the cluster and these tasks will be distributed in the two workers.
Upvotes: 1