Reputation: 157
I am new to kafka and am using debezium kafka to track changes in my postgrest table. Following is my docker-complse.yml
version: '3.8'
volumes:
shared-workspace:
name: "hadoop-distributed-file-system"
driver: local
services:
postgres:
restart: always
image: debezium/postgres
container_name: postgres
ports:
- "5432:5432"
environment:
- POSTGRES_PASSWORD=mosip123
- POSTGRES_DB=anonprofile
# to activate WAL
# command: postgres -c wal_level=logical -c archive_mode=on -c max_wal_senders=5
volumes:
- shared-workspace:/opt/workspace
- ./PostgresDB:/docker-entrypoint-initdb.d/
zookeeper:
image: debezium/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"
container_name: zookeeper
volumes:
- shared-workspace:/opt/workspace
kafka:
image: debezium/kafka
container_name: kafka
ports:
- "9092:9092"
- "29092:29092"
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=LISTENER_EXT://localhost:29092,LISTENER_INT://kafka:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=LISTENER_INT:PLAINTEXT,LISTENER_EXT:PLAINTEXT
- KAFKA_LISTENERS=LISTENER_INT://0.0.0.0:9092,LISTENER_EXT://0.0.0.0:29092
- KAFKA_INTER_BROKER_LISTENER_NAME=LISTENER_INT
volumes:
- shared-workspace:/opt/workspace
connect:
image: debezium/connect
container_name: connect
ports:
- "8083:8083"
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
depends_on:
- zookeeper
- kafka
volumes:
- shared-workspace:/opt/workspace
The shell script inside postgres container. Please note the datatype is JSON, if that is the source of the error?
#!/bin/bash
apt-get update && apt-get install postgresql-13-pgoutput
psql -U postgres -d anonprofile <<-EOSQL
CREATE TABLE IF NOT EXISTS anon_profiles (id SERIAL PRIMARY KEY, profiledata JSON );
ALTER TABLE anon_profiles REPLICA IDENTITY USING INDEX anon_profiles_pkey;
ALTER SYSTEM SET wal_level to 'logical';
EOSQL
The connector json file
{ "name": "anonprofile-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "user",
"database.password": "mosip123",
"database.dbname" : "anonprofile",
"database.server.name": "MOSIP",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"database.history.kafka.bootstrap.servers": "kafka:29092",
"database.history.kafka.topic": "schema-changes.anon_profiles",
"plugin.name": "pgoutput",
"publication.autocreate.mode": "all_tables",
"publication.name": "my_publication",
"snapshot.mode": "always"
}
}
After setting everything up I don't find any errors but by examining the topics list, no topic is being created for the above postgres connection. Am I missing something?
Topics list
$docker exec -it \
$(docker ps | grep kafka | awk '{ print $1 }') \
/kafka/bin/kafka-topics.sh \
--bootstrap-server localhost:9092 --list
__consumer_offsets
my_connect_configs
my_connect_offsets
my_connect_statuses
Upvotes: 1
Views: 1676
Reputation: 157
The issue was with the database.user
, the user needs to have access to create replication slots. Setting it as postgres
did the work. Or else grant the user with the required permissions (I guess that would be to make it as a superuser).
Upvotes: 1