Reputation: 533
So I have a Confluent Kafka JDBC connector set up. First I start up a schema registry such as
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
This is the schema-registery.properties file
listeners=http://0.0.0.0:8081
kafkastore.connection.url=zookeeperhost:2181
kafkastore.bootstrap.servers=PLAINTEXT://kafkahost:9092
kafkastore.topic=_schemas
debug=false
Next I start up a standalone connector like this
./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./jdbc-source.properties
connect-avro-standalone.properties is
bootstrap.servers=kafkahost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=share/java
jdbc-source.properties is
name=jdbc_source_oracle
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=host)(PORT=port))(CONNECT_DATA=(SERVER=dedicated)(SID=server)))
connection.user=xxx
connection.password=xxx
table.whitelist=table1, table2
mode=bulk
topic.prefix=my_topic
query=select * from table1 t1 join table1 t2 on t2.id = t1.id where t2.entereddate >='19-FEB-2019' and t2.entereddate <= '23-FEB-2019'
The query I am using is only for testing purposes, the real query I want to use will implement the incrementing mode and will contain no where clause.
Now this manages to publish data into the topic but with some weird stuff going on. First the IDs are saved in an unreadable format. Just empty square. Second some fields that are populated in the database are saving as null in the topic. And third, whenever I try to change the date in the query in the JDBC source file, nothing happens. It still contain the same messages I published the first time it ran, as in nothing in the kafka topic is updated no many how many times I change the query.
Can anyone help me?
EDIT
What I want to do is consume the data through pyspark code. Here's the code on how I am doing it
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("data streaming app")\
.getOrCreate()
data_raw = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "kafkahost:9092")\
.option("subscribe", "my_topic")\
.load()
query = data_raw.writeStream\
.outputMode("append")\
.format("console")\
.option("truncate", "false")\
.trigger(processingTime="5 seconds")\
.start()\
.awaitTermination()
I've also consume the data using the kafka-avro-console-consumer using this command
./bin/kafka-avro-console-consumer \
--bootstrap-server kafkahost:9092 \
--property print.key=true \
--from-beginning \
--topic my_topic
Both of these give me weird results
Here's what pyspark code is giving me
and this is what using the avro console is giving me
Blocking out some fields and text as it may contain company sensitive information.
Upvotes: 0
Views: 945
Reputation: 32140
If you're consuming Avro from Spark you'll need to use the correct deserializer.
You're seeing bytes in your Avro data from the console then it's down to the handling of decimals/numerics, as detailed here.
You can read more about Kafka Connect and serialisation alternatives to Avro (including JSON) here.
Upvotes: 1