anonuser1234
anonuser1234

Reputation: 533

Messages saved Kafka Topic not saving correctly via Kafka Connector

So I have a Confluent Kafka JDBC connector set up. First I start up a schema registry such as

./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

This is the schema-registery.properties file

listeners=http://0.0.0.0:8081
kafkastore.connection.url=zookeeperhost:2181
kafkastore.bootstrap.servers=PLAINTEXT://kafkahost:9092
kafkastore.topic=_schemas
debug=false

Next I start up a standalone connector like this

./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./jdbc-source.properties

connect-avro-standalone.properties is

bootstrap.servers=kafkahost:9092

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
plugin.path=share/java

jdbc-source.properties is

name=jdbc_source_oracle
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=host)(PORT=port))(CONNECT_DATA=(SERVER=dedicated)(SID=server)))
connection.user=xxx
connection.password=xxx
table.whitelist=table1, table2
mode=bulk
topic.prefix=my_topic
query=select * from table1 t1 join table1 t2 on t2.id = t1.id where t2.entereddate >='19-FEB-2019' and t2.entereddate <= '23-FEB-2019'

The query I am using is only for testing purposes, the real query I want to use will implement the incrementing mode and will contain no where clause.

Now this manages to publish data into the topic but with some weird stuff going on. First the IDs are saved in an unreadable format. Just empty square. Second some fields that are populated in the database are saving as null in the topic. And third, whenever I try to change the date in the query in the JDBC source file, nothing happens. It still contain the same messages I published the first time it ran, as in nothing in the kafka topic is updated no many how many times I change the query.

Can anyone help me?

EDIT

What I want to do is consume the data through pyspark code. Here's the code on how I am doing it

from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("data streaming app")\
    .getOrCreate()


data_raw = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafkahost:9092")\
    .option("subscribe", "my_topic")\
    .load()

query = data_raw.writeStream\
    .outputMode("append")\
    .format("console")\
    .option("truncate", "false")\
    .trigger(processingTime="5 seconds")\
    .start()\
    .awaitTermination()

I've also consume the data using the kafka-avro-console-consumer using this command

./bin/kafka-avro-console-consumer \
--bootstrap-server kafkahost:9092 \
--property print.key=true \
--from-beginning \
--topic my_topic

Both of these give me weird results

Here's what pyspark code is giving me enter image description here

and this is what using the avro console is giving me

enter image description here

Blocking out some fields and text as it may contain company sensitive information.

Upvotes: 0

Views: 945

Answers (1)

Robin Moffatt
Robin Moffatt

Reputation: 32140

If you're consuming Avro from Spark you'll need to use the correct deserializer.

You're seeing bytes in your Avro data from the console then it's down to the handling of decimals/numerics, as detailed here.

You can read more about Kafka Connect and serialisation alternatives to Avro (including JSON) here.

Upvotes: 1

Related Questions