Gandharv Suri
Gandharv Suri

Reputation: 157

Debezium Postgres Kafka not creating topic

I am new to kafka and am using debezium kafka to track changes in my postgrest table. Following is my docker-complse.yml

version: '3.8'

volumes:
  shared-workspace:
    name: "hadoop-distributed-file-system"
    driver: local
    
services: 
  postgres:
    restart: always
    image: debezium/postgres
    container_name: postgres
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_PASSWORD=mosip123
      - POSTGRES_DB=anonprofile
    # to activate WAL 
    # command: postgres -c wal_level=logical -c archive_mode=on -c max_wal_senders=5
    
    volumes:
      - shared-workspace:/opt/workspace
      - ./PostgresDB:/docker-entrypoint-initdb.d/
  
  zookeeper:
    image: debezium/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
      - "2888:2888"
      - "3888:3888"
    container_name: zookeeper
    volumes:
      - shared-workspace:/opt/workspace

  kafka:
    image: debezium/kafka
    container_name: kafka
    ports:
      - "9092:9092"
      - "29092:29092"
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=LISTENER_EXT://localhost:29092,LISTENER_INT://kafka:9092
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=LISTENER_INT:PLAINTEXT,LISTENER_EXT:PLAINTEXT
      - KAFKA_LISTENERS=LISTENER_INT://0.0.0.0:9092,LISTENER_EXT://0.0.0.0:29092
      - KAFKA_INTER_BROKER_LISTENER_NAME=LISTENER_INT
    volumes:
      - shared-workspace:/opt/workspace    
  
  connect:
    image: debezium/connect
    container_name: connect
    ports:
      - "8083:8083"
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
    depends_on:
      - zookeeper
      - kafka
    volumes:
      - shared-workspace:/opt/workspace

The shell script inside postgres container. Please note the datatype is JSON, if that is the source of the error?

#!/bin/bash
apt-get update && apt-get install postgresql-13-pgoutput
psql -U postgres -d anonprofile <<-EOSQL
    CREATE TABLE IF NOT EXISTS anon_profiles (id SERIAL PRIMARY KEY, profiledata JSON );
    ALTER TABLE anon_profiles REPLICA IDENTITY USING INDEX anon_profiles_pkey;
    ALTER SYSTEM SET wal_level to 'logical';
EOSQL

The connector json file

{ "name": "anonprofile-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "user",
    "database.password": "mosip123",
    "database.dbname" : "anonprofile",
    "database.server.name": "MOSIP",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "database.history.kafka.bootstrap.servers": "kafka:29092",
    "database.history.kafka.topic": "schema-changes.anon_profiles",
    "plugin.name": "pgoutput",
    "publication.autocreate.mode": "all_tables",
    "publication.name": "my_publication",
    "snapshot.mode": "always"
  }
}

After setting everything up I don't find any errors but by examining the topics list, no topic is being created for the above postgres connection. Am I missing something?

Topics list

$docker exec -it \
  $(docker ps | grep kafka | awk '{ print $1 }') \
  /kafka/bin/kafka-topics.sh \
    --bootstrap-server localhost:9092 --list
__consumer_offsets
my_connect_configs
my_connect_offsets
my_connect_statuses

Upvotes: 1

Views: 1676

Answers (1)

Gandharv Suri
Gandharv Suri

Reputation: 157

The issue was with the database.user, the user needs to have access to create replication slots. Setting it as postgres did the work. Or else grant the user with the required permissions (I guess that would be to make it as a superuser).

Upvotes: 1

Related Questions