Reputation: 30232
The following is an example Dataframe snippet:
+-------------------+--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|_lid |trace |message |
+-------------------+--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1103960793391132675|47c10fda9b40407c998c154dc71a9e8c|[app.py:208] Prediction label: {"id": 617, "name": "CENSORED"}, score=0.3874854505062103 |
|1103960793391132676|47c10fda9b40407c998c154dc71a9e8c|[app.py:224] Similarity values: [0.6530804801919593, 0.6359653379418201] |
|1103960793391132677|47c10fda9b40407c998c154dc71a9e8c|[app.py:317] Predict=s3://CENSORED/scan_4745/scan4745_t1_r0_c9_2019-07-15-10-32-43.jpg trait_id=112 result=InferenceResult(predictions=[Prediction(label_id='230', label_name='H3', probability=0.0), Prediction(label_id='231', label_name='Other', probability=1.0)], selected=Prediction(label_id='231', label_name='Other', probability=1.0)). Took 1.3637824058532715 seconds |
+-------------------+--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
I have millions of these, log like structures, where they all can be grouped by trace which is unique to a session.
I'm looking to transform these sets of rows into single rows, essentially mapping over them, for for this example I would extract from the first name the "id": 617
from the second row the values 0.6530804801919593, 0.6359653379418201
and from the third row the Prediction(label_id='231', label_name='Other', probability=1.0)
value.
Then I would compose a new table having the columns:
| trace | id | similarity | selected |
with the values:
| 47c10fda9b40407c998c154dc71a9e8c | 617 | 0.6530804801919593, 0.6359653379418201 | 231 |
How should I implement this group-map transform over several rows in pyspark ?
Upvotes: 0
Views: 93
Reputation: 30232
I ended up using something in the lines of
expected_schema = StructType([
StructField("event_timestamp", TimestampType(), False),
StructField("trace", StringType(), False),
...
])
@F.pandas_udf(expected_schema, F.PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def transform(pdf):
output = {}
for l in pdf.to_dict(orient='record'):
x = re.findall(r'^(\[.*:\d+\]) (.*)', l['message'])[0][1]
...
return pd.DataFrame(data=[output])
df.groupby('trace').apply(transform)
Upvotes: 0
Reputation: 1380
I've written the below example in Scala for my own convenience, but it should translate readily to Pyspark.
1) Create the new columns in your dataframe via regexp_extract
on the "message" field. This will produce the desired values if the regex matches, or empty strings if not:
scala> val dss = ds.select(
| 'trace,
| regexp_extract('message, "\"id\": (\\d+),", 1) as "id",
| regexp_extract('message, "Similarity values: \\[(\\-?[0-9\\.]+, \\-?[0-9\\.]+)\\]", 1) as "similarity",
| regexp_extract('message, "selected=Prediction\\(label_id='(\\d+)'", 1) as "selected"
| )
dss: org.apache.spark.sql.DataFrame = [trace: string, id: string ... 2 more fields]
scala> dss.show(false)
+--------------------------------+---+--------------------------------------+--------+
|trace |id |similarity |selected|
+--------------------------------+---+--------------------------------------+--------+
|47c10fda9b40407c998c154dc71a9e8c|617| | |
|47c10fda9b40407c998c154dc71a9e8c| |0.6530804801919593, 0.6359653379418201| |
|47c10fda9b40407c998c154dc71a9e8c| | |231 |
+--------------------------------+---+--------------------------------------+--------+
2) Group by "trace" and eliminate the cases where the regex didn't match. The quick and dirty way (show below) is to select the max
of each column, but you might need to do something more sophisticated if you expect to encounter more than one match per trace:
scala> val ds_final = dss.groupBy('trace).agg(max('id) as "id", max('similarity) as "similarity", max('selected) as "selected")
ds_final: org.apache.spark.sql.DataFrame = [trace: string, id: string ... 2 more fields]
scala> ds_final.show(false)
+--------------------------------+---+--------------------------------------+--------+
|trace |id |similarity |selected|
+--------------------------------+---+--------------------------------------+--------+
|47c10fda9b40407c998c154dc71a9e8c|617|0.6530804801919593, 0.6359653379418201|231 |
+--------------------------------+---+--------------------------------------+--------+
Upvotes: 1