Reputation: 101
My project is, write json to Kafka Topic and read json from kafka topic finally sink a csv. Everything is okey. But some key a nested json. How can i parsed list in json ?
Example Json:
{"a": "test", "b": "1234", "c": "temp", "d": [{"test1": "car", "test2": 345}, {"test3": "animal", "test4": 1}], "e": 50000}
You can see my code bellow.
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as func
spark = SparkSession.builder\
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0') \
ordersSchema = StructType() \
.add("a", StringType()) \
.add("b", StringType()) \
.add("c", StringType()) \
.add("d", StringType())\
.add("e", StringType())
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
df_query = df \
.selectExpr("cast(value as string)") \
df_s = df_query \
.writeStream \
.format("console") \
.outputMode("append") \
.trigger(processingTime = "1 seconds")\
aa = df_query \
.writeStream \
.trigger(processingTime = "5 seconds")\
.option("path", "/var/kafka_stream_test_out/")\
.option("checkpointLocation", "/var/kafka_stream_test_out/chk") \
Thank you!
Upvotes: 0
Views: 612
Reputation: 2178
Schema for column "d" is wrong. It needs to be an ArrayType. Please see the equivalent Scala code, you can convert it to Python.
val schema = new StructType().add("a",StringType)
.add("d",ArrayType(new StructType().add("test1",StringType).add("test2",StringType)))
The json has different column names on each row of "d". I am assuming that is a typo and the fields are "test1" and "test2"
Upvotes: 1