Reputation: 2100
I am using below dependncies,
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-oracle</artifactId>
<version>${version.debezium}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.debezium/debezium-connector-mysql -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${version.debezium}</version>
</dependency>
<version.debezium>0.8.3.Final</version.debezium>
Below is my java method,
public void runMysqlParsser() {
Configuration config = Configuration.create()
/* begin engine properties */
.with("connector.class",
"io.debezium.connector.mysql.MySqlConnector")
.with("offset.storage",
"org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename",
"/home/mohit/tmp/offset.dat")
.with("offset.flush.interval.ms", 60000)
/* begin connector properties */
.with("name", "my-sql-connector")
.with("database.hostname", "localhost")
.with("database.port", 3306)
.with("database.user", "root")
.with("database.password", "root")
.with("server.id", 1)
.with("database.server.name", "my-app-connector")
.with("database.history",
"io.debezium.relational.history.FileDatabaseHistory")
.with("database.history.file.filename",
"/home/mohit/tmp/dbhistory.dat")
.with("database.whitelist", "mysql")
.with("table.whitelist", "mysql.customers")
.build();
EmbeddedEngine engine = EmbeddedEngine.create()
.using(config)
.notifying(this::handleEvent)
.build();
Executor executor = Executors.newSingleThreadExecutor();
executor.execute(engine);
}
private void handleEvent(SourceRecord sourceRecord) {
try {
LOG.info("Got record :" + sourceRecord.toString());
} catch (Exception ex) {
LOG.info("exception in handle event:" + ex);
}
My sql configurations, .
general_log_file = /var/log/mysql/mysql.log
general_log = 1
server-id = 1
log_bin = /var/log/mysql/mysql-bin.log
expire_logs_days = 10
max_binlog_size = 100M
binlog_format = row
binlog_row_image = full
binlog_rows_query_log_events = on
gtid_mode = on
enforce_gtid_consistency = on
When I am running this code, I am getting the offset for the history logs also the mysql.log file is getting offset added to it. However when I am executing any update statement to the table, it is not giving me any logs i.e. the handleEvent method is not getting called. Can anyone tell me what is wrong with the code or configuration ?
Below is the logs after running the java code,
$$ java -jar debezium-gcp-1.0-SNAPSHOT-jar-with-dependencies.jar
log4j:WARN No appenders could be found for logger (org.apache.kafka.connect.json.JsonConverterConfig). log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Nov 28, 2018 1:29:47 PM com.debezium.gcp.SampleMysqlEmbededDebezium handleEvent INFO: Got record :SourceRecord{sourcePartition={server=my-app-connector}, sourceOffset={file=mysql-bin.000002, pos=980, gtids=31b708c7-ee22-11e8-b8a3-080027fbf50e:1-17, snapshot=true}} ConnectRecord{topic='my-app-connector', kafkaPartition=0, key=Struct{databaseName=}, value=Struct{source=Struct{version=0.8.3.Final,name=my-app-connector,server_id=0,ts_sec=0,file=mysql-bin.000002,pos=980,row=0,snapshot=true},databaseName=,ddl=SET character_set_server=latin1, collation_server=latin1_swedish_ci;}, timestamp=null, headers=ConnectHeaders(headers=)} Nov 28, 2018 1:29:47 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect INFO: Connected to localhost:3306 at 31b708c7-ee22-11e8-b8a3-080027fbf50e:1-17 (sid:6326, cid:21)
Upvotes: 0
Views: 1150
Reputation: 1976
Are you whitelisting the correct database/table?
Could you please look at this demo - https://github.com/debezium/debezium-examples/tree/master/kinesis
Just drop the Kinesis related code and print only to console.
Also check table.ignore.builtin
configuration option. IMHO mysql
database belongs among the builtin ones and is filtered out by default.
Upvotes: 0