Mostafa Ghadimi
Mostafa Ghadimi

Reputation: 6766

Implement dockerized kafka sink connector to mongo

I am trying to implement kafka connection to mongodb and mysql using docker.

What I want is the following figure:

Kafka connect as sink to MongoDB and MySQL

Kafka Connect MongoDB:

I have seen the docker-compose of official mongodb repository. It has two problems:

  1. It is too complicated for my purpose. Because it has run multiple containers of mongodb and also used many images that consume so much resources.

  2. It has some issues that isn't solved which end in malfunctioning of kafka to mongodb connection. Here you can see my issue.

What I have implemented in docker-compose.yml using debezium for connection is the following:

version: '3.2'
services:
  kafka:
    image: wurstmeister/kafka:latest
    ports:
      - target: 9094
        published: 9094
        protocol: tcp
        mode: host
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092
      KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_LOG_DIRS: /kafka/logs
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - kafka:/kafka

  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    volumes:
      - zookeeper:/opt/zookeeper-3.4.13

  mongo:
    image: mongo
    container_name: mongo
    ports:
      - 27017:27017

  connect:
    image: debezium/connect
    container_name: connect
    ports:
      - 8083:8083
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets

volumes:
  kafka:
  zookeeper:

As @cricket_007 says, I should not use debezium for my purpose. So I have used the confluentinc/kafka-connect-datagen image. Here I have added the following to the docker-compose.yml file instead of debezium:

connect:
    image: confluentinc/kafka-connect-datagen
    build:
      context: .
      dockerfile: Dockerfile
    hostname: connect
    container_name: connect
    depends_on: 
      - zookeeper
    ports: 
      - 8083:8083
    environment: 
      CONNECT_BOOTSTRAP_SERVERS: 'kafka:9092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_PLUGIN_PATH: /usr/share/confluent-hub-components
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      # Assumes image is based on confluentinc/kafka-connect-datagen:latest which is pulling 5.2.2 Connect image
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.2.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
    command: "bash -c 'if [ ! -d /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen ]; then echo \"WARNING: Did not find directory for kafka-connect-datagen (did you remember to run: docker-compose up -d --build ?)\"; fi ; /etc/confluent/docker/run'"
    volumes:
      - ../build/confluent/kafka-connect-mongodb:/usr/share/confluent-hub-components/kafka-connect-mongodb

Dockerfile:

FROM confluentinc/cp-kafka-connect
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN  confluent-hub install --no-prompt confluentinc/kafka-connect-datagen

Problem:

  1. The Kafka-connect-datagen image generates fake data and as it mentioned in the repository, it's not suitable for production. What I want is just connect Kafka to mongodb, neither less nor more than it. Explicitly, how can I send data from kafka with curl and save them in a mongodb collection?

  2. I face with the CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL is required. error. As @cricket_007 said schema-registry is optional. So how can I get rid of that image?

  3. At the last step I tried to run the repository's docker-compose file as explained in README.md, unfortunately I faced with another error:

    WARNING: Could not reach configured kafka system on http://localhost:8083 Note: This script requires curl.

  4. Whenever I didn't make any change to the configuration, I face with another error:

Kafka Connectors: 

{"error_code":409,"message":"Cannot complete request momentarily due to stale configuration (typically caused by a concurrent config change)"}

Please help me to find answers for my questions.

My output:

Building the MongoDB Kafka Connector

> Task :shadowJar
FatJar: /home/mostafa/Documents/Docker/kafka-mongo/build/libs/kafka-mongo-0.3-SNAPSHOT-all.jar (2.108904 MB)

Deprecated Gradle features were used in this build, making it incompatible with Gradle 6.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/5.2/userguide/command_line_interface.html#sec:command_line_warnings

BUILD SUCCESSFUL in 4h 26m 25s
7 actionable tasks: 7 executed
Unzipping the confluent archive plugin....

Archive:  ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT.zip
   creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/
   creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/etc/
  inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/etc/MongoSinkConnector.properties  
  inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/etc/MongoSourceConnector.properties  
   creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/lib/
  inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/lib/kafka-mongo-0.3-SNAPSHOT-all.jar  
  inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/manifest.json  
   creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/assets/
  inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/assets/mongodb-leaf.png  
  inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/assets/mongodb-logo.png  
   creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/doc/
  inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/doc/README.md  
  inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/doc/LICENSE.txt  
Starting docker .
Creating volume "docker_rs2" with default driver
Creating volume "docker_rs3" with default driver
Building connect
Step 1/3 : FROM confluentinc/cp-kafka-connect:5.2.2
 ---> 32bb41f78617
Step 2/3 : ENV CONNECT_PLUGIN_PATH="/usr/share/confluent-hub-components"
 ---> Using cache
 ---> 9e4fd4f10a38
Step 3/3 : RUN  confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
 ---> Using cache
 ---> 5f879008bb73

Successfully built 5f879008bb73
Successfully tagged confluentinc/kafka-connect-datagen:latest
Recreating mongo1 ... 
Recreating mongo1        ... done
Creating mongo3          ... done
Starting broker   ... done
Creating mongo2          ... done
Starting schema-registry ... done
Starting connect         ... done
Creating rest-proxy      ... done
Creating ksql-server              ... done
Creating docker_kafka-topics-ui_1 ... done
Creating control-center           ... done
Creating ksql-cli                 ... done


Waiting for the systems to be ready.............
WARNING: Could not reach configured kafka system on http://localhost:8082 
Note: This script requires curl.



SHUTTING DOWN


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    68  100    68    0     0     23      0  0:00:02  0:00:02 --:--:--    23
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    61  100    61    0     0   4066      0 --:--:-- --:--:-- --:--:--  4066
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    63  100    63    0     0   9000      0 --:--:-- --:--:-- --:--:--  9000
MongoDB shell version v4.0.12
connecting to: mongodb://127.0.0.1:27017/?gssapiServiceName=mongodb
Implicit session: session { "id" : UUID("80ebb904-f81a-4230-b63b-4e62f65fbeb7") }
MongoDB server version: 4.0.12
{
        "ok" : 1,
        "operationTime" : Timestamp(1567235833, 1),
        "$clusterTime" : {
                "clusterTime" : Timestamp(1567235833, 1),
                "signature" : {
                        "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
                        "keyId" : NumberLong(0)
                }
        }
}
Stopping ksql-cli                 ... done
Stopping control-center           ... done
Stopping docker_kafka-topics-ui_1 ... done
Stopping ksql-server              ... done
Stopping rest-proxy               ... done
Stopping mongo1                   ... done
Stopping mongo2                   ... done
Stopping mongo3                   ... done
Stopping connect                  ... done
Stopping broker                   ... done
Stopping zookeeper                ... done
Removing ksql-cli                 ... 
Removing control-center           ... done
Removing docker_kafka-topics-ui_1 ... done
Removing ksql-server              ... done
Removing rest-proxy               ... done
Removing mongo1                   ... done
Removing mongo2                   ... done
Removing mongo3                   ... done
Removing connect                  ... done
Removing schema-registry          ... done
Removing broker                   ... done
Removing zookeeper                ... done
Removing network docker_default
Removing network docker_localnet

WARNING: Could not reach configured kafka system on http://localhost:8082 
Note: This script requires curl.

Upvotes: 2

Views: 5511

Answers (2)

Seyed Morteza Mousavi
Seyed Morteza Mousavi

Reputation: 6963

I created the following docker-compose file (view all files in GitHub):

version: '3.6'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.1.2
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    networks:
      - localnet
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka:5.1.2
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9092:9092"
    networks:
      - localnet
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  connect:
    image: confluentinc/cp-kafka-connect:5.1.2
    build:
      context: .
      dockerfile: Dockerfile
    hostname: connect
    container_name: connect
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8083:8083"
    networks:
      - localnet
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR,com.mongodb.kafka=DEBUG"
      CONNECT_PLUGIN_PATH: /usr/share/confluent-hub-components
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      # Assumes image is based on confluentinc/kafka-connect-datagen:latest which is pulling 5.2.2 Connect image
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.2.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
    command: "bash -c 'if [ ! -d /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen ]; then echo \"WARNING: Did not find directory for kafka-connect-datagen (did you remember to run: docker-compose up -d --build ?)\"; fi ; /etc/confluent/docker/run'"
    volumes:
      - ./kafka-connect-mongodb:/usr/share/confluent-hub-components/kafka-connect-mongodb

# MongoDB Replica Set
  mongo1:
    image: "mongo:4.0-xenial"
    container_name: mongo1
    command: --replSet rs0 --smallfiles --oplogSize 128
    volumes:
      - rs1:/data/db
    networks:
      - localnet
    ports:
      - "27017:27017"
    restart: always
  mongo2:
    image: "mongo:4.0-xenial"
    container_name: mongo2
    command: --replSet rs0 --smallfiles --oplogSize 128
    volumes:
      - rs2:/data/db
    networks:
      - localnet
    ports:
      - "27018:27017"
    restart: always
  mongo3:
    image: "mongo:4.0-xenial"
    container_name: mongo3
    command: --replSet rs0 --smallfiles --oplogSize 128
    volumes:
      - rs3:/data/db
    networks:
      - localnet
    ports:
      - "27019:27017"
    restart: always

networks:
  localnet:
    attachable: true

volumes:
  rs1:
  rs2:
  rs3:

After executing docker-compose up you have to configure your MongoDB cluster:

docker-compose exec mongo1 /usr/bin/mongo --eval '''if (rs.status()["ok"] == 0) {
    rsconf = {
      _id : "rs0",
      members: [
        { _id : 0, host : "mongo1:27017", priority: 1.0 },
        { _id : 1, host : "mongo2:27017", priority: 0.5 },
        { _id : 2, host : "mongo3:27017", priority: 0.5 }
      ]
    };
    rs.initiate(rsconf);
}
rs.conf();'''

Make sure that your plugin is installed:

curl localhost:8083/connector-plugins | jq

[
  {
    "class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "type": "sink",
    "version": "0.2"
  },
  {
    "class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "type": "source",
    "version": "0.2"
  },
  {
    "class": "io.confluent.connect.gcs.GcsSinkConnector",
    "type": "sink",
    "version": "5.0.1"
  },
  {
    "class": "io.confluent.connect.storage.tools.SchemaSourceConnector",
    "type": "source",
    "version": "2.1.1-cp1"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "2.1.1-cp1"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "2.1.1-cp1"
  }
]

As you can see above the MongoDB connectors plugins are available for use. Assuming you have a database named mydb and a collection named products I create JSON file named sink-connector.json:

{
    "name": "mongo-sink",
    "config": {
        "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
        "tasks.max": "1",
        "topics": "product.events",
        "connection.uri": "mongodb://mongo1:27017,mongo2:27017,mongo3:27017",
        "database": "mydb",
        "collection": "products",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false"
    }
}

Now create connector using connect RESTful API:

curl -X POST -H "Content-Type: application/json" -d @sink-connector.json http://localhost:8083/connectors | jq

You can view the status of your connector:

curl http://localhost:8083/connectors/mongo-sink/status | jq
    {
"name": "mongo-sink",
"connector": {
    "state": "RUNNING",
    "worker_id": "connect:8083"
},
"tasks": [
    {
    "id": 0,
    "state": "RUNNING",
    "worker_id": "connect:8083"
    }
],
"type": "sink"
}

Now let's create a Kafka topic. First, we must connect to Kafka container:

docker-compose exec broker bash

Then create the topic:

kafka-topics --zookeeper zookeeper:2181 --create --topic product.events --partitions 1 --replication-factor 1

Now produce products into the topic:

kafka-console-producer --broker-list localhost:9092 --topic product.events
>{"Name": "Hat", "Price": 25}
>{"Name": "Shoe", "Price": 15}

You can view the result in the image: enter image description here

Hope this will help you.

Upvotes: 7

OneCricketeer
OneCricketeer

Reputation: 191904

Debezium reads data from Mongo. If you want a sink connector, you'll need to use that official one you found, but there are also others available on Github, for example.

Kafka Connect uses a REST API, so you'll also need to create a JSON payload with all the connection and topic details. There are guides in that repo you found

it has run multiple containers of mongodb and also used many images that consume so much resources.

You do not need KSQL, Control Center, REST proxy, Topic UI, etc. Only Kafka, Zookeeper, Connect, Mongo, and optionally the Schema Registry. So just remove the other containers in the compose file. You also probably don't need multiple Mongo containers, but then you'll need to reconfigure the environment variables to adjust to only one instance

how can I send data from kafka with curl and save them in a mongodb collection?

If you did want to use curl, then you will need to start the REST Proxy container. That would get you past the Could not reach configured kafka system on http://localhost:8082 error message.

Upvotes: 0

Related Questions