shiv
shiv

Reputation: 2100

Debezium is not giving the CDC for Embedded version for mysql

I am using below dependncies,

        <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-connector-oracle</artifactId>
        <version>${version.debezium}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/io.debezium/debezium-connector-mysql -->
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-connector-mysql</artifactId>
        <version>${version.debezium}</version>
    </dependency>

<version.debezium>0.8.3.Final</version.debezium>

Below is my java method,

public void runMysqlParsser() {

    Configuration config = Configuration.create()
            /* begin engine properties */
            .with("connector.class",
                    "io.debezium.connector.mysql.MySqlConnector")
            .with("offset.storage",
                    "org.apache.kafka.connect.storage.FileOffsetBackingStore")
            .with("offset.storage.file.filename",
                    "/home/mohit/tmp/offset.dat")
            .with("offset.flush.interval.ms", 60000)
            /* begin connector properties */
            .with("name", "my-sql-connector")
            .with("database.hostname", "localhost")
            .with("database.port", 3306)
            .with("database.user", "root")
            .with("database.password", "root")
            .with("server.id", 1)
            .with("database.server.name", "my-app-connector")
            .with("database.history",
                    "io.debezium.relational.history.FileDatabaseHistory")
            .with("database.history.file.filename",
                    "/home/mohit/tmp/dbhistory.dat")
            .with("database.whitelist", "mysql")
            .with("table.whitelist", "mysql.customers")
            .build();
    EmbeddedEngine engine = EmbeddedEngine.create()
            .using(config)
            .notifying(this::handleEvent)
            .build();
    Executor executor = Executors.newSingleThreadExecutor();
    executor.execute(engine);
}

    private void handleEvent(SourceRecord sourceRecord) {
    try {
        LOG.info("Got record :" + sourceRecord.toString());
    } catch (Exception ex) {
        LOG.info("exception in handle event:" + ex);
    }

My sql configurations, .

general_log_file = /var/log/mysql/mysql.log
general_log = 1
server-id               = 1
log_bin                 = /var/log/mysql/mysql-bin.log
expire_logs_days        = 10
max_binlog_size   = 100M
binlog_format     = row
binlog_row_image  = full
binlog_rows_query_log_events = on
gtid_mode        =  on
enforce_gtid_consistency   = on

When I am running this code, I am getting the offset for the history logs also the mysql.log file is getting offset added to it. However when I am executing any update statement to the table, it is not giving me any logs i.e. the handleEvent method is not getting called. Can anyone tell me what is wrong with the code or configuration ?

Below is the logs after running the java code,

$$ java -jar debezium-gcp-1.0-SNAPSHOT-jar-with-dependencies.jar 

log4j:WARN No appenders could be found for logger (org.apache.kafka.connect.json.JsonConverterConfig). log4j:WARN Please initialize the log4j system properly.

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Nov 28, 2018 1:29:47 PM com.debezium.gcp.SampleMysqlEmbededDebezium handleEvent INFO: Got record :SourceRecord{sourcePartition={server=my-app-connector}, sourceOffset={file=mysql-bin.000002, pos=980, gtids=31b708c7-ee22-11e8-b8a3-080027fbf50e:1-17, snapshot=true}} ConnectRecord{topic='my-app-connector', kafkaPartition=0, key=Struct{databaseName=}, value=Struct{source=Struct{version=0.8.3.Final,name=my-app-connector,server_id=0,ts_sec=0,file=mysql-bin.000002,pos=980,row=0,snapshot=true},databaseName=,ddl=SET character_set_server=latin1, collation_server=latin1_swedish_ci;}, timestamp=null, headers=ConnectHeaders(headers=)} Nov 28, 2018 1:29:47 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect INFO: Connected to localhost:3306 at 31b708c7-ee22-11e8-b8a3-080027fbf50e:1-17 (sid:6326, cid:21)

Upvotes: 0

Views: 1150

Answers (1)

Jiri Pechanec
Jiri Pechanec

Reputation: 1976

Are you whitelisting the correct database/table?

Could you please look at this demo - https://github.com/debezium/debezium-examples/tree/master/kinesis Just drop the Kinesis related code and print only to console. Also check table.ignore.builtin configuration option. IMHO mysql database belongs among the builtin ones and is filtered out by default.

Upvotes: 0

Related Questions