Reputation: 21
I'm getting the following records in blob format with a new line separated. Below is an example of two events separated by a newline,
Few things to note here,
In the example below, event(Structure) are in inconsistent. For certain events i will get Channel Id,conversation Id,replyActivity Id,from Id,locale columns and for absent columns i need to populate as null in my data frame.
How will i able to achive this in Pyspark ?
Example:
{
"event":[
{
"name":"Zip/Postal Code",
"count":1
}
],
"internal":{
"data":{
"id":"XXXX",
"documentVersion":"1.61"
}
},
"context":{
"application":{
"version":"Thu 10/15/2020 2:46:54.65 \r\nUTC (fv-az464-530) [Build 174613] [Repo Intercom] [Branch prod] [Commit XXXX] \r\n[IntercomWebUIVersion 1.6.20-169031] [IntercomBotAppTemplatesVersion 1.3.27-165664] \r\n"
},
"data":{
"eventTime":"2020-10-20T15:54:48.7734934Z",
"isSynthetic":false,
"samplingRate":100.0
},
"cloud":{
},
"device":{
"type":"PC",
"roleName":"bc-directline-eus2",
"roleInstance":"RD0004FFA145F5",
"screenResolution":{
}
},
"session":{
"isFirst":false
},
"operation":{
"id":"f115c4bf-4fa31385d9a8f248",
"parentId":"|f115c4bf-4fa31385d9a8f248."
},
"location":{
"clientip":"0.0.0.0",
"continent":"North America",
"country":"United States",
"province":"Virginia",
"city":"Boydton"
},
"custom":{
"dimensions":[
{
"Timestamp":"XXXX"
},
{
"StatusCode":"200"
},
{
"Activity ID":"HR48uEYXuCE1yIsFMLL3X3-j|0000006"
},
{
"From ID":"XXXX"
},
{
"Correlation ID":"|f115c4bf-4fa31385d9a8f248."
},
{
"Channel ID":"directline"
},
{
"Recipient ID":"7222C-RG-CAR-MP5-HVC-Chatbot-P-p7rpums@Ye6TP1LJz0o"
},
{
"Bot ID":"XXXX"
},
{
"Activity Type":"message"
},
{
"Conversation ID":"HR48uEYXuCE1yIsFMLL3X3-j"
}
]
}
}
}{
"event":[
{
"name":"Activity",
"count":1
}
],
"internal":{
"data":{
"id":"992b0fc7-12ec-11eb-b59a-fb2df7d234d8",
"documentVersion":"1.61"
}
},
"context":{
"application":{
"version":"Thu 10/15/2020 2:46:54.65 \r\nUTC (fv-az464-530) [Build 174613] [Repo Intercom] [Branch prod] [Commit XXXX] \r\n[IntercomWebUIVersion 1.6.20-169031] [IntercomBotAppTemplatesVersion 1.3.27-165664] \r\n"
},
"data":{
"eventTime":"2020-10-20T15:54:34.3811795Z",
"isSynthetic":false,
"samplingRate":100.0
},
"cloud":{
},
"device":{
"type":"PC",
"roleName":"bc-directline-eastus3",
"roleInstance":"RD00155D33F838",
"screenResolution":{
}
},
"session":{
"isFirst":false
},
"operation":{
"id":"00-508c4cceaa6d954599230123d012265b-5f1d891b61135340-00",
"parentId":"|00-508c4cceaa6d954599230123d012265b-5f1d891b61135340-00.2fac18fc_"
},
"location":{
"clientip":"0.0.0.0",
"continent":"North America",
"country":"United States",
"province":"Virginia",
"city":"Washington"
},
"custom":{
"dimensions":[
{
"Timestamp":"XXXX"
},
{
"StatusCode":"200"
},
{
"Activity ID":"HR48uEYXuCE1yIsFMLL3X3-j|0000000"
},
{
"From ID":"XXXX"
},
{
"Correlation ID":"|00-508c4cceaa6d954599230123d012265b-5f1d891b61135340-00.2fac18fc_"
},
{
"Channel ID":"directline"
},
{
"Bot ID":"7222C-RG-CAR-MP5-HVC-Chatbot-P-p7rpums"
},
{
"Activity Type":"message"
},
{
"Conversation ID":"HR48uEYXuCE1yIsFMLL3X3-j"
}
]
}
}
}
I need to extract these records in to following table format (Column Name mentioned below),
ActivityId | ActivityType | ChannelId | conversationId | replyActivityId | fromId | locale | recipientId | speak | text | name |eventTime | Date | InstanceId | DialogId | StepName | applicationId | intent | intentScore | entities | question | sentimentLabel | sentimentScore | knowledgeBaseId | answer | articleFound | originalQuestion| question | questionId | score | username | city | province | country | Feedback | Comment | Tag
Upvotes: 0
Views: 267
Reputation: 687
I took your sample data and created a json file. I read it in with Spark using this code:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.json('/tmp/data.json')
df.show()
and it gave me:
+--------------------+--------------------+--------------------+
| context| event| internal|
+--------------------+--------------------+--------------------+
|{{Thu 10/15/2020 ...|[{1, Zip/Postal C...| {{1.61, XXXX}}|
|{{Thu 10/15/2020 ...| [{1, Activity}]|{{1.61, 992b0fc7-...|
+--------------------+--------------------+--------------------+
The problem with this format was I was losing metadata. So this made me change the approach. My attempt is to load the JSON as a string column and then parse it later. You can do this by using:
df = spark.read.text('/tmp/data.json')
df.show()
which gives:
+--------------------+
| value|
+--------------------+
|{"event": [{"name...|
|{"event": [{"name...|
+--------------------+
From here, we can use a pandas UDF (or normal UDF) to process it. I will use the Fugue library as a way to easily convert Python and Pandas code to a Pandas UDF, but you can just turn the logic into a Pandas UDF later if you don't want to use Fugue.
Your final schema is very long so I think the concept will be clearer if I just use the first 3 columns. In this snippet I will extract:
ActivityId | ActivityType | ChannelId
In order to prototype, I will convert the original DataFrame to Pandas:
pdf = df.toPandas()
And then I will make a function that holds the logic. Some of this code may be repetitive and you might be able to simplify it with functions. I think this should be enough to illustrate the logic. One of the frustrating pieces was that it's tedious to pull some fields. There are Lists of Dicts that are a bit hard to access, but you can still get it to work.
import json
from typing import List, Dict, Any, Iterable
def process(df: List[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:
for row in df:
record = json.loads(df[0]["value"])
# Activity Id
activity_id = record.get('context', {}).get('custom', {}).get('dimensions', [{}])
activity_id = [x for x in activity_id if "Activity ID" in x.keys()]
if len(activity_id) == 1:
activity_id = activity_id[0]['Activity ID']
else:
activity_id = None
# Activity Type
activity_type = record.get('context', {}).get('custom', {}).get('dimensions', [{}])
activity_type = [x for x in activity_type if "Activity Type" in x.keys()]
if len(activity_type) == 1:
activity_type = activity_type[0]['Activity Type']
else:
activity_type = None
# Channel Id
channel_id = record.get('context', {}).get('custom', {}).get('dimensions', [{}])
channel_id = [x for x in channel_id if "Channel ID" in x.keys()]
if len(channel_id) == 1:
channel_id = channel_id[0]['Channel ID']
else:
channel_id = None
yield {"ActivityId": activity_id,
"ActivityType": activity_type,
"ChannelId": channel_id,
}
This function just converts each row to json and then extracts the relevant fields. You might notice that the input and output types are not Pandas DataFrames. This is okay because Fugue can handle the conversion for us. In order to test this function, we can do:
import fugue.api as fa
schema = "ActivityId:str, ActivityType:str, ChannelId:str"
out = fa.transform(pdf, process, schema=schema)
# output is Pandas
out.head()
and this will adapt the process
function to run on Pandas DataFrames. Schema is a requirement for Spark, so Fugue requires it as well. This gives us the following result:
ActivityId ActivityType ChannelId
HR48uEYXuCE1yIsFMLL3X3-j|0000006 message directline
HR48uEYXuCE1yIsFMLL3X3-j|0000006 message directline
Now that we know it works on Pandas, we can bring it to Spark with the exact same command. We just need to pass in the Spark DataFrame instead.
out = fa.transform(df, process, schema=schema)
# returns a Spark DataFrame
out.show()
Under the hood, Fugue will convert each partition to a List[Dict[str,Any]
and then apply the process
function. In this case, it is just applied on the default partitions of your DataFrame. The output annotation Iterable[Dict[str,Any]
guides Fugue how to bring the result back out to a Spark DataFrame.
Upvotes: 1