Reputation: 57
I plan to extract the data from Kafka(self-signed certificate).
My consumer is the following
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StringType, LongType, TimestampType,IntegerType
if __name__ == '__main__':
spark = SparkSession \
.builder \
.appName("pyspark_structured_streaming_kafka") \
.getOrCreate()
df_raw = spark.readStream \
.format("kafka") \
.option("kafka.ssl.truststore.location","file:///Users/picomy/Kafka-keystore/server.truststore") \
.option("kafka.ssl.truststore.password","aoeuid") \
.option("kafka.ssl.keystore.location","file:///Users/picomy/Kafka-keystore/kclient.keystore") \
.option("kafka.ssl.keystore.password","aoeuid") \
.option("kafka.isolation.level","read_committed") \
.option("kafka.bootstrap.servers","52.81.249.81:9093") \
.option("subscribe","product") \
.option("startingOffsets","latest") \
.option("kafka.ssl.endpoint.identification.algorithm","") \
.option("kafka.isolation.level","read_committed") \
.load()
product_schema = StructType() \
.add("product_name", StringType()) \
.add("product_factory", StringType()) \
.add("yield_num", IntegerType()) \
.add("yield_time", StringType())
df_1=df_raw.selectExpr("CAST(value AS STRING)") \
.select(from_json("value",product_schema).alias("data")) \
.select("data.*") \
.writeStream \
.format("console") \
.outputMode("append") \
.option("checkpointLocation","file:///Users/picomy/Kafka-Output/checkpoint") \
.start() \
.awaitTermination()
When I submit the job, I get the error
21/02/04 17:33:58 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-bdbf46eb-42ce-4fd1-bef7-08222138b49c-32919539-executor-1, groupId=spark-kafka-source-bdbf46eb-42ce-4fd1-bef7-08222138b49c-32919539-executor] Bootstrap broker 52.81.249.81:9093 (id: -1 rack: null) disconnected
My code can work well with 9092(PLAIN protocol). Hereby thank mike for his professional knowledge.
The Kafka consumer can work well with the same certificates
./kafka-console-consumer.sh --bootstrap-server 52.81.249.81:9093 --topic product --consumer.config ../config/consumer.properties
{"product_name": "X Laptop","product_factory": "B-3231","yield_num": 899,"yield_time": "20210201 22:00:01"}
{"product_name": "X Laptop","product_factory": "B-3231","yield_num": 899,"yield_time": "20210201 22:00:01"}
{"product_name": "X Laptop","product_factory": "B-3231","yield_num": 899,"yield_time": "20210201 22:00:01"}
I don't know where is the root cause of this problem.
Upvotes: 1
Views: 4080
Reputation: 57
I append another option to tell the Kafka broker communication by SSL.
.option("kafka.security.protocol","SSL")
Upvotes: 1