Reputation: 3751
I have a large dataset of lines (rows) of json. The rows have several fields and the fields that are present depend on one of the json fields in that row. Here is an small example:
%pyspark
data = sc.parallelize([{'key':'k1','a':1.0,'b':2.0},
{'key':'k1','a':1.0,'b':20.0},
{'key':'k1','a':100.0,'b':.2},
{'key':'k2','y':10.0,'z':20.0},
{'key':'k2','y':1.0,'z':250.0},
{'key':'k1','a':1.0,'b':2.0},], 2)
My goal is to get this data into a Dataframe without having to specify the schema. Pyspark has (at least) two functions to help with this: 1) toDF()
, which just takes the first row of data as the schema and 2) sqlContext.createDataFrame()
where you can specify the proportion of rows to sample in order to infer the schema. e.g.:
data.toDF().show()
+-----+----+---+
| a| b|key|
+-----+----+---+
| 1.0| 2.0| k1|
| 1.0|20.0| k1|
|100.0| 0.2| k1|
| null|null| k2|
| null|null| k2|
| 1.0| 2.0| k1|
+-----+----+---+
sqlContext.createDataFrame(data,samplingRatio=1).show()
+-----+----+---+----+-----+
| a| b|key| y| z|
+-----+----+---+----+-----+
| 1.0| 2.0| k1|null| null|
| 1.0|20.0| k1|null| null|
|100.0| 0.2| k1|null| null|
| null|null| k2|10.0| 20.0|
| null|null| k2| 1.0|250.0|
| 1.0| 2.0| k1|null| null|
+-----+----+---+----+-----+
sqlContext.createDataFrame()
does what I want, but since I only have maybe five keys in 4 billion rows, I am thinking that there must be a faster way to infer the schema. Also, some keys are very rare, so I can't get away with making samplingRatio
smaller.
Is there an elegant and fast way to have the schema inferred given there are only a few row types?
Upvotes: 1
Views: 1889
Reputation: 3751
A bit more googling lead me to a solution.
Start by creating a robust dataframe concatenator (unionAll
can't merge schema):
def addEmptyColumns(df, colNames):
exprs = df.columns + ["null as " + colName for colName in colNames]
return df.selectExpr(*exprs)
def concatTwoDfs(left, right):
# append columns from right df to left df
missingColumnsLeft = set(right.columns) - set(left.columns)
left = addEmptyColumns(left, missingColumnsLeft)
# append columns from left df to right df
missingColumnsRight = set(left.columns) - set(right.columns)
right = addEmptyColumns(right, missingColumnsRight)
# let's set the same order of columns
right = right[left.columns]
# finally, union them
return left.unionAll(right)
def concat(dfs):
return reduce(concatTwoDfs, dfs)
(code from https://lab.getbase.com/pandarize-spark-dataframes/)
Then get the distinct keys, make a list of dataframes, and concatenate them:
keys = data.map(lambda x: x['key']).distinct().collect()
a_grp = [data.filter(lambda x: x['key']==k).toDF() for k in keys]
concat(a_grp).show()
+-----+----+---+----+-----+
| a| b|key| y| z|
+-----+----+---+----+-----+
| 1.0| 2.0| k1|null| null|
| 1.0|20.0| k1|null| null|
|100.0| 0.2| k1|null| null|
| 1.0| 2.0| k1|null| null|
| null|null| k2|10.0| 20.0|
| null|null| k2| 1.0|250.0|
+-----+----+---+----+-----+
Upvotes: 1