Azamat
Azamat

Reputation: 249

More efficient way of parsing value data from structured stream dataframe in pyspark3

I am fetching pyspark stream data:

spark = SparkSession \
    .builder \
    .getOrCreate()

raw_stream = spark \
    .readStream \
    .option("endpoint", conf.get('config', 'endpoint')) \
    .option("app.name", conf.get('config', 'app_name')) \
    .option("app.secret", conf.get('config', 'app_key')) \
    .option("dc", conf.get('config', 'dc')) \
    .option("source.topic", conf.get('config', 'topic')) \
    .option("group.name", conf.get('config', 'group')) \
    .option("source.value.binaryType", 'false') \
    .load()

raw_stream_str = raw_stream \
    .selectExpr("CAST(value AS STRING)")

value_batch = raw_stream_str \
    .writeStream \
    .queryName("value_query") \
    .format("memory") \
    .start()

spark.sql("select * from value_query").show()

which output is as below:

+--------------------+
|               value|
+--------------------+
|{"message":"DGc6K...|
+--------------------+

The whole content of value looks like that:

[Row(value='{"message": "xxx", "source": "xxx", "instance": "xxx", "metrics": {"metric1": "xxx", "metric2": "xxx", ...}}')]

where metrics is a dictionary like key-value pairs. I want to extract metrics content so I have something like that:

+-------+-------+-------------------+
|metric1|metric2|            metric3|
+-------+-------+-------------------+
|  "abc"|  12345|01/01/2022 00:00:00|
+-------+-------+-------------------+

I am able to achieve it by considering it as a list of string:

raw_stream_str.selectExpr("value", "split(value,',')[3] as message").drop("value")
raw_stream_str.selectExpr("metrics","split(value,',')[0] as metric1"...).drop("metrics")

Is there more efficent (spark) way of doing it in terms of distributed computing? Maybe by applying/mapping some function on every row of stream output dataframe with the help of json.loads so I can explore its key-value nature?

Upvotes: 0

Views: 584

Answers (1)

kvj
kvj

Reputation: 45

As suggested you can use from_json to get the data in the required schema and after that you can do df.select("metrics.*") to get the required dataframe.

Upvotes: 1

Related Questions