Reputation: 6766
I am trying to implement kafka connection to mongodb and mysql using docker.
What I want is the following figure:
Kafka Connect MongoDB:
I have seen the docker-compose of official mongodb repository. It has two problems:
It is too complicated for my purpose. Because it has run multiple containers of mongodb and also used many images that consume so much resources.
It has some issues that isn't solved which end in malfunctioning of kafka to mongodb connection. Here you can see my issue.
What I have implemented in docker-compose.yml using debezium for connection is the following:
version: '3.2'
services:
kafka:
image: wurstmeister/kafka:latest
ports:
- target: 9094
published: 9094
protocol: tcp
mode: host
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_LOG_DIRS: /kafka/logs
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- kafka:/kafka
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
volumes:
- zookeeper:/opt/zookeeper-3.4.13
mongo:
image: mongo
container_name: mongo
ports:
- 27017:27017
connect:
image: debezium/connect
container_name: connect
ports:
- 8083:8083
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
volumes:
kafka:
zookeeper:
As @cricket_007 says, I should not use debezium
for my purpose. So I have used the confluentinc/kafka-connect-datagen
image. Here I have added the following to the docker-compose.yml file instead of debezium
:
connect:
image: confluentinc/kafka-connect-datagen
build:
context: .
dockerfile: Dockerfile
hostname: connect
container_name: connect
depends_on:
- zookeeper
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: 'kafka:9092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_PLUGIN_PATH: /usr/share/confluent-hub-components
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
# Assumes image is based on confluentinc/kafka-connect-datagen:latest which is pulling 5.2.2 Connect image
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.2.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
command: "bash -c 'if [ ! -d /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen ]; then echo \"WARNING: Did not find directory for kafka-connect-datagen (did you remember to run: docker-compose up -d --build ?)\"; fi ; /etc/confluent/docker/run'"
volumes:
- ../build/confluent/kafka-connect-mongodb:/usr/share/confluent-hub-components/kafka-connect-mongodb
Dockerfile:
FROM confluentinc/cp-kafka-connect
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen
Problem:
The Kafka-connect-datagen
image generates fake data and as it mentioned in the repository, it's not suitable for production. What I want is just connect Kafka to mongodb, neither less nor more than it. Explicitly, how can I send data from kafka with curl
and save them in a mongodb collection?
I face with the CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL is required.
error. As @cricket_007 said schema-registry
is optional. So how can I get rid of that image?
At the last step I tried to run the repository's docker-compose file as explained in README.md, unfortunately I faced with another error:
WARNING: Could not reach configured kafka system on http://localhost:8083 Note: This script requires curl.
Whenever I didn't make any change to the configuration, I face with another error:
Kafka Connectors:
{"error_code":409,"message":"Cannot complete request momentarily due to stale configuration (typically caused by a concurrent config change)"}
Please help me to find answers for my questions.
My output:
Building the MongoDB Kafka Connector
> Task :shadowJar
FatJar: /home/mostafa/Documents/Docker/kafka-mongo/build/libs/kafka-mongo-0.3-SNAPSHOT-all.jar (2.108904 MB)
Deprecated Gradle features were used in this build, making it incompatible with Gradle 6.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/5.2/userguide/command_line_interface.html#sec:command_line_warnings
BUILD SUCCESSFUL in 4h 26m 25s
7 actionable tasks: 7 executed
Unzipping the confluent archive plugin....
Archive: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT.zip
creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/
creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/etc/
inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/etc/MongoSinkConnector.properties
inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/etc/MongoSourceConnector.properties
creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/lib/
inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/lib/kafka-mongo-0.3-SNAPSHOT-all.jar
inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/manifest.json
creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/assets/
inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/assets/mongodb-leaf.png
inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/assets/mongodb-logo.png
creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/doc/
inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/doc/README.md
inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/doc/LICENSE.txt
Starting docker .
Creating volume "docker_rs2" with default driver
Creating volume "docker_rs3" with default driver
Building connect
Step 1/3 : FROM confluentinc/cp-kafka-connect:5.2.2
---> 32bb41f78617
Step 2/3 : ENV CONNECT_PLUGIN_PATH="/usr/share/confluent-hub-components"
---> Using cache
---> 9e4fd4f10a38
Step 3/3 : RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
---> Using cache
---> 5f879008bb73
Successfully built 5f879008bb73
Successfully tagged confluentinc/kafka-connect-datagen:latest
Recreating mongo1 ...
Recreating mongo1 ... done
Creating mongo3 ... done
Starting broker ... done
Creating mongo2 ... done
Starting schema-registry ... done
Starting connect ... done
Creating rest-proxy ... done
Creating ksql-server ... done
Creating docker_kafka-topics-ui_1 ... done
Creating control-center ... done
Creating ksql-cli ... done
Waiting for the systems to be ready.............
WARNING: Could not reach configured kafka system on http://localhost:8082
Note: This script requires curl.
SHUTTING DOWN
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 68 100 68 0 0 23 0 0:00:02 0:00:02 --:--:-- 23
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 61 100 61 0 0 4066 0 --:--:-- --:--:-- --:--:-- 4066
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 63 100 63 0 0 9000 0 --:--:-- --:--:-- --:--:-- 9000
MongoDB shell version v4.0.12
connecting to: mongodb://127.0.0.1:27017/?gssapiServiceName=mongodb
Implicit session: session { "id" : UUID("80ebb904-f81a-4230-b63b-4e62f65fbeb7") }
MongoDB server version: 4.0.12
{
"ok" : 1,
"operationTime" : Timestamp(1567235833, 1),
"$clusterTime" : {
"clusterTime" : Timestamp(1567235833, 1),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
Stopping ksql-cli ... done
Stopping control-center ... done
Stopping docker_kafka-topics-ui_1 ... done
Stopping ksql-server ... done
Stopping rest-proxy ... done
Stopping mongo1 ... done
Stopping mongo2 ... done
Stopping mongo3 ... done
Stopping connect ... done
Stopping broker ... done
Stopping zookeeper ... done
Removing ksql-cli ...
Removing control-center ... done
Removing docker_kafka-topics-ui_1 ... done
Removing ksql-server ... done
Removing rest-proxy ... done
Removing mongo1 ... done
Removing mongo2 ... done
Removing mongo3 ... done
Removing connect ... done
Removing schema-registry ... done
Removing broker ... done
Removing zookeeper ... done
Removing network docker_default
Removing network docker_localnet
WARNING: Could not reach configured kafka system on http://localhost:8082
Note: This script requires curl.
Upvotes: 2
Views: 5511
Reputation: 6963
I created the following docker-compose file (view all files in GitHub):
version: '3.6'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.1.2
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
networks:
- localnet
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-enterprise-kafka:5.1.2
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
networks:
- localnet
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
connect:
image: confluentinc/cp-kafka-connect:5.1.2
build:
context: .
dockerfile: Dockerfile
hostname: connect
container_name: connect
depends_on:
- zookeeper
- broker
ports:
- "8083:8083"
networks:
- localnet
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR,com.mongodb.kafka=DEBUG"
CONNECT_PLUGIN_PATH: /usr/share/confluent-hub-components
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
# Assumes image is based on confluentinc/kafka-connect-datagen:latest which is pulling 5.2.2 Connect image
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.2.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
command: "bash -c 'if [ ! -d /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen ]; then echo \"WARNING: Did not find directory for kafka-connect-datagen (did you remember to run: docker-compose up -d --build ?)\"; fi ; /etc/confluent/docker/run'"
volumes:
- ./kafka-connect-mongodb:/usr/share/confluent-hub-components/kafka-connect-mongodb
# MongoDB Replica Set
mongo1:
image: "mongo:4.0-xenial"
container_name: mongo1
command: --replSet rs0 --smallfiles --oplogSize 128
volumes:
- rs1:/data/db
networks:
- localnet
ports:
- "27017:27017"
restart: always
mongo2:
image: "mongo:4.0-xenial"
container_name: mongo2
command: --replSet rs0 --smallfiles --oplogSize 128
volumes:
- rs2:/data/db
networks:
- localnet
ports:
- "27018:27017"
restart: always
mongo3:
image: "mongo:4.0-xenial"
container_name: mongo3
command: --replSet rs0 --smallfiles --oplogSize 128
volumes:
- rs3:/data/db
networks:
- localnet
ports:
- "27019:27017"
restart: always
networks:
localnet:
attachable: true
volumes:
rs1:
rs2:
rs3:
After executing docker-compose up
you have to configure your MongoDB cluster:
docker-compose exec mongo1 /usr/bin/mongo --eval '''if (rs.status()["ok"] == 0) {
rsconf = {
_id : "rs0",
members: [
{ _id : 0, host : "mongo1:27017", priority: 1.0 },
{ _id : 1, host : "mongo2:27017", priority: 0.5 },
{ _id : 2, host : "mongo3:27017", priority: 0.5 }
]
};
rs.initiate(rsconf);
}
rs.conf();'''
Make sure that your plugin is installed:
curl localhost:8083/connector-plugins | jq
[
{
"class": "com.mongodb.kafka.connect.MongoSinkConnector",
"type": "sink",
"version": "0.2"
},
{
"class": "com.mongodb.kafka.connect.MongoSourceConnector",
"type": "source",
"version": "0.2"
},
{
"class": "io.confluent.connect.gcs.GcsSinkConnector",
"type": "sink",
"version": "5.0.1"
},
{
"class": "io.confluent.connect.storage.tools.SchemaSourceConnector",
"type": "source",
"version": "2.1.1-cp1"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "2.1.1-cp1"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "2.1.1-cp1"
}
]
As you can see above the MongoDB connectors plugins are available for use. Assuming you have a database named mydb
and a collection named products
I create JSON file named sink-connector.json:
{
"name": "mongo-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "1",
"topics": "product.events",
"connection.uri": "mongodb://mongo1:27017,mongo2:27017,mongo3:27017",
"database": "mydb",
"collection": "products",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
Now create connector using connect RESTful API:
curl -X POST -H "Content-Type: application/json" -d @sink-connector.json http://localhost:8083/connectors | jq
You can view the status of your connector:
curl http://localhost:8083/connectors/mongo-sink/status | jq
{
"name": "mongo-sink",
"connector": {
"state": "RUNNING",
"worker_id": "connect:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "connect:8083"
}
],
"type": "sink"
}
Now let's create a Kafka topic. First, we must connect to Kafka container:
docker-compose exec broker bash
Then create the topic:
kafka-topics --zookeeper zookeeper:2181 --create --topic product.events --partitions 1 --replication-factor 1
Now produce products into the topic:
kafka-console-producer --broker-list localhost:9092 --topic product.events
>{"Name": "Hat", "Price": 25}
>{"Name": "Shoe", "Price": 15}
You can view the result in the image:
Hope this will help you.
Upvotes: 7
Reputation: 191904
Debezium reads data from Mongo. If you want a sink connector, you'll need to use that official one you found, but there are also others available on Github, for example.
Kafka Connect uses a REST API, so you'll also need to create a JSON payload with all the connection and topic details. There are guides in that repo you found
it has run multiple containers of mongodb and also used many images that consume so much resources.
You do not need KSQL, Control Center, REST proxy, Topic UI, etc. Only Kafka, Zookeeper, Connect, Mongo, and optionally the Schema Registry. So just remove the other containers in the compose file. You also probably don't need multiple Mongo containers, but then you'll need to reconfigure the environment variables to adjust to only one instance
how can I send data from kafka with curl and save them in a mongodb collection?
If you did want to use curl
, then you will need to start the REST Proxy container. That would get you past the Could not reach configured kafka system on http://localhost:8082
error message.
Upvotes: 0