Eyal S.
Eyal S.

Reputation: 1161

pyspark - remove punctuations from schema

I have a json file which I'm able to load via:

df = spark.read.json(fpath)

The json is nested and some of the nested column names have punctuation marks in them. This is creating issues when I try to create an unmanaged table. I can solve this by creating a schema with valid column names but this is a labor intensive process since I have many files with many names each.

I would like to be able to read a json file, modify the file schema by removing any punctuation marks from the column names, and then using the new schema to save the file. Is that possible?

Here is an example json file:

[{"cursor": "eyJfaW", "node": {"person": {"_id": "5cca", "display": "66"}, "completedQueues": ["STATEMENT_QUERYBUILDERCACHE_QUEUE", "STATEMENT_PERSON_QUEUE", "STATEMENT_FORWARDING_QUEUE"], "processingQueues": [], "deadForwardingQueue": [], "pendingForwardingQueue": [], "completedForwardingQueue": [], "failedForwardingLog": [], "_id": "5ce372", "hasGeneratedId": false, "organisation": "5b6803e", "lrs_id": "5c9bf", "client": "5c9", "active": true, "voided": false, "timestamp": "2019-05-21T03:36:34.199Z", "stored": "2019-05-21T03:36:34.345Z", "hash": "531c7", "agents": ["mailto:[email protected]"], "relatedAgents": ["mailto:[email protected]", "mailto:[email protected]"], "registrations": [], "verbs": ["http://test.gov/expapi/verbs/completed"], "activities": ["https://test.test.org/wgua/pf/spNext/page1"], "relatedActivities": ["https://test.test.org/test/pf/spNext/page1"], "statement": {"id": "b389190", "timestamp": "2019-05-21T03:36:34.199Z", "actor": {"objectType": "Agent", "mbox": "mailto:[email protected]", "name": "66"}, "verb": {"id": "http://test.gov/expapi/verbs/completed", "display": {"en-US": "completed"}}, "result": {"extensions": {"http://test.org/xapi/extension/timein": 6.863}}, "object": {"id": "https://test.test.org/wgua/pf/spNext/page1", "objectType": "Activity", "definition": {"type": "http://test.gov/expapi/activities/page", "name": {"en-US": "Strategic Planning - Intro - Page 1"}, "description": {"en-US": "Strategic Planning Introduction Page 1"}, "extensions": {"http://test.org/xapi/extension/path": {"project": "wgua", "course": "test", "section": "spNext", "page": "page1", "object": null}}}}, "stored": "2019-05-21T03:36:34.345Z", "authority": {"objectType": "Agent", "name": "New Client", "mbox": "mailto:[email protected]"}, "version": "1.0.0"}, "__v": 1, "personaIdentifier": "5cc"}}]

Upvotes: 0

Views: 258

Answers (1)

niuer
niuer

Reputation: 1669

Try this:

  1. Load json file to a DataFrame
import json
import re
from pyspark.sql.types import *

df=spark.read.json("t.json")
df.printSchema()

This will be the first several fields in the schema using your example data, note the field "__v"

root
 |-- cursor: string (nullable = true)
 |-- node: struct (nullable = true)
 |    |-- __v: long (nullable = true)
 |    |-- _id: string (nullable = true)
 |    |-- active: boolean (nullable = true)
 |    |-- activities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
...
  1. To remove the __ from "__v", let's convert the schema to string and use re module
olds = df.schema
jsonStr = olds.json() # get json string for the schema
jsonStrCleaned = re.sub('_', '', jsonStr) # remove '_'
jsonDataCleaned = json.loads(jsonStrCleaned) # convert the string back to json object
news = StructType.fromJson(jsonDataCleaned)  # construct new schema from the cleaned json
  1. Now create a new DataFrame using the cleaned schema
df1 = spark.createDataFrame(df.rdd, news)
df1.printSchema()

You can see the "__v" field has been changed:

root
 |-- cursor: string (nullable = true)
 |-- node: struct (nullable = true)
 |    |-- v: long (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- active: boolean (nullable = true)
 |    |-- activities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
...

Now you can save the DataFrame to a file with the new schema.

Upvotes: 1

Related Questions