Reputation: 249
I am fetching pyspark stream data:
spark = SparkSession \
.builder \
.getOrCreate()
raw_stream = spark \
.readStream \
.option("endpoint", conf.get('config', 'endpoint')) \
.option("app.name", conf.get('config', 'app_name')) \
.option("app.secret", conf.get('config', 'app_key')) \
.option("dc", conf.get('config', 'dc')) \
.option("source.topic", conf.get('config', 'topic')) \
.option("group.name", conf.get('config', 'group')) \
.option("source.value.binaryType", 'false') \
.load()
raw_stream_str = raw_stream \
.selectExpr("CAST(value AS STRING)")
value_batch = raw_stream_str \
.writeStream \
.queryName("value_query") \
.format("memory") \
.start()
spark.sql("select * from value_query").show()
which output is as below:
+--------------------+
| value|
+--------------------+
|{"message":"DGc6K...|
+--------------------+
The whole content of value looks like that:
[Row(value='{"message": "xxx", "source": "xxx", "instance": "xxx", "metrics": {"metric1": "xxx", "metric2": "xxx", ...}}')]
where metrics is a dictionary like key-value pairs. I want to extract metrics content so I have something like that:
+-------+-------+-------------------+
|metric1|metric2| metric3|
+-------+-------+-------------------+
| "abc"| 12345|01/01/2022 00:00:00|
+-------+-------+-------------------+
I am able to achieve it by considering it as a list of string:
raw_stream_str.selectExpr("value", "split(value,',')[3] as message").drop("value")
raw_stream_str.selectExpr("metrics","split(value,',')[0] as metric1"...).drop("metrics")
Is there more efficent (spark) way of doing it in terms of distributed computing? Maybe by applying/mapping some function on every row of stream output dataframe with the help of json.loads so I can explore its key-value nature?
Upvotes: 0
Views: 584
Reputation: 45
As suggested you can use from_json
to get the data in the required schema and after that you can do df.select("metrics.*")
to get the required dataframe.
Upvotes: 1