Reputation: 67
first of all thanks to @OneCricketeer for your support so far. I have tried so many configurations by now that I don't know what else I could try.
Using confluent connect-standalone
to access an external stream.
Connection is working and i can see that a offset is loaded:
INFO [my_mysql_sink|task-0] [Consumer clientId=connector-consumer-my_mysql_sink-0, groupId=connect-my_mysql_sink] Setting offset for partition gamerboot.gamer.master.workouts.clubs.spieleranalyse-1 to the committed offset FetchPosition{ offset=2225 , offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ (id: 8 rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:844)
But afterwards i receive an error when newly messages come in:
ERROR [my_mysql_sink|task-0] WorkerSinkTask{id=my_mysql_sink-0} Error converting message key in topic 'gamerboot.gamer.master.workouts.clubs.spieleranalyse' partition 1 at offset 2225 and timestamp 1641459346507: Failed to deserialize data for topic gamerboot.gamer.master.workouts.clubs.spieleranalyse to Avro:
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro key schema version for id 422
Caused by: Subject not found.; error code: 40401
I do not get this.
As there is no pk set in mysql i wanted to record everything from stream.
As it says "Error retrieving Avro key schema version for id 422" I can see the following:
Do not wonder as it says JSON, this is just my ChromePlugin which interprets this as json. Same is found for value. I also tried every combination in which is commented out there. I also be able to curl the latest schema for key and value (like):
"type": "record",
"name": "ClubWorkoutKey",
"namespace": "",
"fields": [
"name": "playerId",
"type": "string"
"name": "tagId",
"type": [
"default": null
It went a little further when I entered String Converter for key.converter and value.converter in . But what must be wrong in my opinion, because Avro is passed here. With String there were then other problems, I would have had to set a pk and switch on delete etc.
Thanks for support.
So, to me was given:
topic = gamerboot.gamer.master.workouts.clubs.spieleranalyse
schema.url =
as well as: schema id url:
For me its like a puzzle, i started with kafka 20 days ago. From there i tried the urls around and found the ones i posted for subject:
For Key:
Schema: {"subject":"","version":1,"id":422,"schema":"{\"type\":\"record\",\"name\":\"ClubWorkoutKey\",\"namespace\":\"\",\"fields\":[{\"name\":\"playerId\",\"type\":\"string\"},{\"name\":\"tagId\",\"type\":[\"null\",\"string\"],\"default\":null}]}"}
For Values:
Schemas: {"subject":"","version":1,"id":423,"schema":"{\"type\":\"record\",\"name\":\"ClubWorkoutKickValue\",\"namespace\":\"\",\"fields\":[{\"name\":\"playerId\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"tagId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ballSpeed\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"ballSpeedFloat\",\"type\":[\"null\",\"float\"],\"default\":null},{\"name\":\"ballSpeedZone\",\"type\":{\"type\":\"enum\",\"name\":\"BallSpeedZone\",\"symbols\":[\"COLD\",\"MEDIUM\",\"HOT\",\"FIRE\",\"INVALID\"]}},{\"name\":\"confidence\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"ingestionTime\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"}
and: {"subject":"","version":1,"id":424,"schema":"{\"type\":\"record\",\"name\":\"ClubWorkoutPlayerMotionValue\",\"namespace\":\"\",\"fields\":[{\"name\":\"playerId\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"absoluteDistance\",\"type\":\"float\"},{\"name\":\"averageSpeed\",\"type\":\"float\"},{\"name\":\"peakSpeed\",\"type\":\"float\"},{\"name\":\"tagId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"installationId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"averageSpeedZone\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"AverageSpeedZone\",\"symbols\":[\"SPRINT\",\"HIGH_SPEED_RUN\",\"RUN\",\"JOG\",\"WALK\",\"STAND\",\"INVALID\"]}],\"default\":null,\"aliases\":[\"speedZone\"]},{\"name\":\"peakSpeedZone\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"PeakSpeedZone\",\"symbols\":[\"SPRINT\",\"HIGH_SPEED_RUN\",\"RUN\",\"JOG\",\"WALK\",\"STAND\",\"INVALID\"]}],\"default\":null},{\"name\":\"ingestionTime\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"}
MySQL table:
| Field | Type | Null | Key | Default | Extra |
| playerid | varchar(100) | YES | | NULL | |
| timestamp | mediumtext | YES | | NULL | |
| absoluteDistance | float | YES | | NULL | |
| avarageSpeed | float | YES | | NULL | |
| peakSpeed | float | YES | | NULL | |
| tagId | varchar(50) | YES | | NULL | |
| installationId | varchar(100) | YES | | NULL | |
| averageSpeedZone | enum('SPRINT','HIGH_SPEED_RUN','RUN','JOG','WALK','STAND','INVALID') | YES | | NULL | |
| peakSpeedZone | enum('SPRINT','HIGH_SPEED_RUN','RUN','JOG','WALK','STAND','INVALID') | YES | | NULL | |
| ballSpeed | int(11) | YES | | NULL | |
| ballSpeedFloat | float | YES | | NULL | |
| ballSpeedZone | enum('COLD','MEDIUM','HOT','FIRE','INVALID') | YES | | NULL | |
| confidence | int(11) | YES | | NULL | |
| ingestionTime | mediumtext | YES | | NULL | |
Data expected in MySQL:
| playerid | timestamp | absoluteDistance | avarageSpeed | peakSpeed | tagId | installationId | averageSpeedZone | peakSpeedZone | ballSpeed | ballSpeedFloat | ballSpeedZone | confidence | ingestionTime |
| 59a70d45-5c00-4bb6-966d-b961b78ef5c1 | 1641495873505 | 5.76953 | 1.1543 | 1.22363 | 0104FLHBN009XD | null | WALK | WALK | NULL | NULL | NULL | NULL | 1641496586458 |
| 59a70d45-5c00-4bb6-966d-b961b78ef5c1 | 1641484677624 | NULL | NULL | NULL | 0104FLHBN009XD | NULL | NULL | NULL | 37 | 37.0897 | COLD | 77 | 1641484896747 |
Data from avro-console looks like for the db entries:
This is a fresh actual confluent installation. I updated just some hours ago Avro to: kafka-connect-avro-converter:7.0.1
Upvotes: 2
Views: 5881
Reputation: 67
Schemas was changed by company regarding RecordNameStrategy. Everything is working now.
Upvotes: 0