Reputation: 193
I have been struggling with sinking data from my kafka topic to a mysql database table and here are the scenarios playing out.
First the data in the kafka topic is from another mysql database table (using debezium cdc) with the following config:
{
"name": "smartdevsignupconnector111",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true",
"database.hostname": "mysql1",
"database.port": "3306",
"database.user": "clusterAdmin",
"database.password": "**********",
"database.server.id": "184055",
"database.server.name": "smartdevdbserver1",
"database.include.list": "signup_db",
"database.history.kafka.bootstrap.servers": "kafka1:9092",
"database.history.kafka.topic": "schema-changes.signup_db",
"schema.history.internal.kafka.topic": "KafkaSchemaHistoryTopic",
"schema.history.internal.kafka.bootstrap.servers": "kafka1:9092",
"table.whitelist": "signup_db.users",
"column.blacklist": "signup_db.users.fullName, signup_db.users.address, signup_db.users.phoneNo, signup_db.users.gender, signup_db.users.userRole, signup_db.users.reason_for_inactive, signup_db.users.firstvisit, signup_db.users.last_changed_PW, signup_db.users.regDate",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"snapshot.mode": "when_needed",
"topic.creation.enable": "true",
"topic.prefix": "smartdevdbserver1",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "1"
}
}
and here is the data in the kafka topic:
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":false,"field":"password"},{"type":"string","optional":true,"field":"fullName"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"phoneNo"},{"type":"string","optional":true,"field":"gender"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"visitor,student,Admin"},"default":"visitor","field":"userRole"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"ACTIVE,INACTIVE"},"default":"INACTIVE","field":"User_status"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"visitor,TERMINATED,SUSPENDED_FOR_VIOLATION"},"default":"visitor","field":"reason_for_inactive"},{"type":"string","optional":true,"field":"firstvisit"},{"type":"string","optional":false,"name":"io.debezium.time.ZonedTimestamp","version":1,"default":"1970-01-01T00:00:00Z","field":"last_changed_PW"},{"type":"int64","optional":false,"name":"io.debezium.time.Timestamp","version":1,"default":0,"field":"regDate"},{"type":"string","optional":true,"field":"auth_token"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Value"},"payload":{"id":22,"email":"[email protected]","password":"$2a$10$qj8AEMZMqRKV.68b0uwFZu0Zx14Pk77/HhmSn/bNmirjuYOwVmhse","fullName":null,"address":null,"phoneNo":null,"gender":null,"userRole":"visitor","User_status":"INACTIVE","reason_for_inactive":"visitor","firstvisit":null,"last_changed_PW":"2022-11-05T20:57:44Z","regDate":1667681864000,"auth_token":null}}
Now this is the jdbc sink connector:
{
"name": "resetpassword-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true",
"topics": "smartdevdbserver1.signup_db.users",
"connection.url": "jdbc:mysql://rpwd_mysql:3306/rpwd_db",
"connection.user": "rpwd_user",
"connection.password": "**********",
"table.name.format": "rpwd_db.users",
"fields.whitelist": "rpwd_db.users.id,rpwd_db.users.email,rpwd_db.users.password,rpwd_db.users.User_status,rpwd_db.users.auth_token",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value"
}
}
but when i execute the jdbc-sink-connector, i get the error:
java.sql.BatchUpdateException: Field 'email' doesn't have a default value
and the logs look like this:
connect | 2022-11-06 10:23:26,357 INFO || Attempting to open connection #1 to MySql [io.confluent.connect.jdbc.util.CachedConnectionProvider]
connect | 2022-11-06 10:23:50,563 INFO || JdbcDbWriter Connected [io.confluent.connect.jdbc.sink.JdbcDbWriter]
connect | 2022-11-06 10:23:53,616 INFO || Checking MySql dialect for existence of TABLE "rpwd_db"."users" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect]
connect | 2022-11-06 10:23:54,966 INFO || Using MySql dialect TABLE "rpwd_db"."users" present [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect]
connect | 2022-11-06 10:23:55,597 INFO || Checking MySql dialect for type of TABLE "rpwd_db"."users" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect]
connect | 2022-11-06 10:23:55,640 INFO || Setting metadata for table "rpwd_db"."users" to Table{name='"rpwd_db"."users"', type=TABLE columns=[Column{'id', isPrimaryKey=true, allowsNull=false, sqlType=INT}, Column{'email', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}, Column{'User_status', isPrimaryKey=false, allowsNull=false, sqlType=ENUM}, Column{'auth_token', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'password', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}]} [io.confluent.connect.jdbc.util.TableDefinitions]
connect | 2022-11-06 10:23:56,355 WARN || Write of 4 records failed, remainingRetries=10 [io.confluent.connect.jdbc.sink.JdbcSinkTask]
connect | java.sql.BatchUpdateException: Field 'email' doesn't have a default value
connect | at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
connect | at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
connect | at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
connect | at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
connect | at com.mysql.cj.util.Util.handleNewInstance(Util.java:192)
connect | at com.mysql.cj.util.Util.getInstance(Util.java:167)
connect | at com.mysql.cj.util.Util.getInstance(Util.java:174)
connect | at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)
connect | at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:816)
connect | at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:418)
connect | at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:795)
connect | at io.confluent.connect.jdbc.sink.BufferedRecords.executeUpdates(BufferedRecords.java:196)
connect | at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:186)
connect | at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80)
connect | at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
connect | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect | at java.base/java.lang.Thread.run(Thread.java:829)
connect | Caused by: java.sql.SQLException: Field 'email' doesn't have a default value
connect | at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
connect | at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
connect | at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:916)
connect | at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1061)
connect | at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:795)
connect | ... 17 more
connect | 2022-11-06 10:23:56,589 INFO || Closing connection #1 to MySql [io.confluent.connect.jdbc.util.CachedConnectionProvider]
Note that i created the table (users) where the sink connector needs to insert its data in the database by my self.
Again, even if i remove the email field, the next field which is password reports the same error and so on.
If i decide to use auto.create (by removing the pre-created users table) instead, the jdbc-sink-connector creates the table but only one field (i.e the primary key column - id) is created and populated and the rest of the field is ignored.
mysql> select * from users;
+----+
| id |
+----+
| 1 |
| 8 |
| 15 |
| 22 |
| 29 |
+----+
5 rows in set (0.02 sec)
below is the logs for auto.create:
connect | 2022-11-09 09:56:01,676 INFO || Initializing writer using SQL dialect: MySqlDatabaseDialect [io.confluent.connect.jdbc.sink.JdbcSinkTask]
connect | 2022-11-09 09:56:01,691 INFO || WorkerSinkTask{id=resetpassword-sink-connector-0} Sink task finished initialization and start [org.apache.kafka.connect.runtime.WorkerSinkTask]
connect | 2022-11-09 09:56:01,705 INFO || WorkerSinkTask{id=resetpassword-sink-connector-0} Executing sink task [org.apache.kafka.connect.runtime.WorkerSinkTask]
connect | 2022-11-09 09:56:01,755 INFO || [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] Resetting the last seen epoch of partition users-0 to 0 since the associated topicId changed from null to hjdRgZ2PTgWqZxW53PaAfg [org.apache.kafka.clients.Metadata]
connect | 2022-11-09 09:56:01,756 INFO || [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] Cluster ID: 2vOSZymlQ42roGNU_nYGFg [org.apache.kafka.clients.Metadata]
connect | 2022-11-09 09:56:01,765 INFO || [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] Discovered group coordinator kafka1:9092 (id: 2147483646 rack: null) [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
connect | 2022-11-09 09:56:01,770 INFO || [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] (Re-)joining group [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
connect | 2022-11-09 09:56:01,938 INFO || [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] Request joining group due to: need to re-join with the given member-id: connector-consumer-resetpassword-sink-connector-0-ff424264-403a-4259-8e5a-5171d9500692 [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
kafka1 | 2022-11-09 09:56:01,956 - INFO [data-plane-kafka-request-handler-4:Logging@66] - [GroupCoordinator 1]: Preparing to rebalance group connect-resetpassword-sink-connector in state PreparingRebalance with old generation 0 (__consumer_offsets-21) (reason: Adding new member connector-consumer-resetpassword-sink-connector-0-ff424264-403a-4259-8e5a-5171d9500692 with group instance id None; client reason: rebalance failed due to MemberIdRequiredException)
connect | 2022-11-09 09:56:01,941 INFO || [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException) [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
connect | 2022-11-09 09:56:01,941 INFO || [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] (Re-)joining group [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
kafka1 | 2022-11-09 09:56:02,150 - INFO [executor-Rebalance:Logging@66] - [GroupCoordinator 1]: Stabilized group connect-resetpassword-sink-connector generation 1 (__consumer_offsets-21) with 1 members
connect | 2022-11-09 09:56:02,166 INFO || [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] Successfully joined group with generation Generation{generationId=1, memberId='connector-consumer-resetpassword-sink-connector-0-ff424264-403a-4259-8e5a-5171d9500692', protocol='range'} [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
connect | 2022-11-09 09:56:02,196 INFO || [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] Finished assignment for group at generation 1: {connector-consumer-resetpassword-sink-connector-0-ff424264-403a-4259-8e5a-5171d9500692=Assignment(partitions=[users-0])} [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
kafka1 | 2022-11-09 09:56:02,239 - INFO [data-plane-kafka-request-handler-5:Logging@66] - [GroupCoordinator 1]: Assignment received from leader connector-consumer-resetpassword-sink-connector-0-ff424264-403a-4259-8e5a-5171d9500692 for group connect-resetpassword-sink-connector for generation 1. The group has 1 members, 0 of which are static.
connect | 2022-11-09 09:56:02,254 INFO || [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] Successfully synced group in generation Generation{generationId=1, memberId='connector-consumer-resetpassword-sink-connector-0-ff424264-403a-4259-8e5a-5171d9500692', protocol='range'} [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
connect | 2022-11-09 09:56:02,255 INFO || [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] Notifying assignor about the new Assignment(partitions=[users-0]) [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
connect | 2022-11-09 09:56:02,255 INFO || [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] Adding newly assigned partitions: users-0 [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
connect | 2022-11-09 09:56:02,414 INFO || [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] Found no committed offset for partition users-0 [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
connect | 2022-11-09 09:56:02,433 INFO || [Consumer clientId=connector-consumer-resetpassword-sink-connector-0, groupId=connect-resetpassword-sink-connector] Resetting offset for partition users-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka1:9092 (id: 1 rack: null)], epoch=0}}. [org.apache.kafka.clients.consumer.internals.SubscriptionState]
connect | 2022-11-09 09:56:02,574 INFO || Attempting to open connection #1 to MySql [io.confluent.connect.jdbc.util.CachedConnectionProvider]
connect | 2022-11-09 09:56:05,960 INFO || JdbcDbWriter Connected [io.confluent.connect.jdbc.sink.JdbcDbWriter]
connect | 2022-11-09 09:56:06,168 INFO || Checking MySql dialect for existence of table "rpwd_db"."users" [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
connect | 2022-11-09 09:56:06,583 INFO || Using MySql dialect table "rpwd_db"."users" present [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
connect | 2022-11-09 09:56:06,768 INFO || Setting metadata for table "rpwd_db"."users" to Table{name='"rpwd_db"."users"', columns=[Column{'id', isPrimaryKey=true, allowsNull=false, sqlType=INT}]} [io.confluent.connect.jdbc.util.TableDefinitions]
So either ways, the sink connector seems to be dealing with the primary key column only as it is evidence in the last part of the logs: "Setting metadata for table", which ends up setting metadata only for the column "id", while ignoring the other 4 columns.
Setting metadata for table "rpwd_db"."users" to Table{name='"rpwd_db"."users"', columns=[Column{'id', isPrimaryKey=true, allowsNull=false, sqlType=INT}]} [io.confluent.connect.jdbc.util.TableDefinitions]
I know that there is something i am missing, can someone help me to walk through what i am missing.
Upvotes: 0
Views: 2219
Reputation: 193
The problem was with this line:
"fields.whitelist": "rpwd_db.users.id,rpwd_db.users.email,rpwd_db.users.password,rpwd_db.users.User_status,rpwd_db.users.auth_token",
I remove all the prefix before actual column names and left it like this:
"fields.whitelist": "id,email,password,User_status,auth_token",
I also removed the prefix in table.name.format from:
"table.name.format": "rpwd_db.users"
to:
"table.name.format": "users",
Upvotes: 0