Reputation: 31
With PySpark I'm trying to convert a RDD of nested dicts into a dataframe but I'm losing data in some fields which are set to null.
Here's the code :
sc = SparkContext()
sqlContext = SQLContext(sc)
def convert_to_row(d):
return Row(**d)
df2 = sc.parallelize([{"id": "14yy74hwogxoyl2l3v", "geoloc": {"country": {"geoname_id": 3017382, "iso_code": "FR", "name": "France"}}}]).map(convert_to_row).toDF()
df2.printSchema()
df2.show()
df2.toJSON().saveAsTextFile("/tmp/json.test")
When I'm having a look at /tmp/json.test, here's the content (after manually indent):
{
"geoloc": {
"country": {
"name": null,
"iso_code": null,
"geoname_id": 3017382
}
},
"id": "14yy74hwogxoyl2l3v"
}
iso_code
and name
have been converted to null
.
Can anyone help me with it ? I can't understand it.
I'm using Python 2.7 and Spark 2.0.0
Thanks !
Upvotes: 2
Views: 2409
Reputation: 60390
Following the explanation already provided by @user6910411 (and saving me the time to do it myself), the remedy (i.e. the intermediate JSON representation) is to use read.json
instead of toDF
and your function:
spark.version
# u'2.0.2'
jsonRDD = sc.parallelize([{"id": "14yy74hwogxoyl2l3v", "geoloc": {"country": {"geoname_id": 3017382, "iso_code": "FR", "name": "France"}}}])
df = spark.read.json(jsonRDD)
df.collect()
# result:
[Row(geoloc=Row(country=Row(geoname_id=3017382, iso_code=u'FR', name=u'France')), id=u'14yy74hwogxoyl2l3v')]
# just to have a look at what will be saved:
df.toJSON().collect()
# result:
[u'{"geoloc":{"country":{"geoname_id":3017382,"iso_code":"FR","name":"France"}},"id":"14yy74hwogxoyl2l3v"}']
df.toJSON().saveAsTextFile("/tmp/json.test")
For comparison, here is how your own df2
looks:
df2.collect()
# result:
[Row(geoloc={u'country': {u'geoname_id': 3017382, u'iso_code': None, u'name': None}}, id=u'14yy74hwogxoyl2l3v')]
df2.toJSON().collect()
# result:
[u'{"geoloc":{"country":{"name":null,"iso_code":null,"geoname_id":3017382}},"id":"14yy74hwogxoyl2l3v"}']
Upvotes: 1
Reputation: 330393
This happens because you don't use Row
correctly. Row
constructor is not recursive and operated only on the top level fields. When you take a look at the schema:
root
|-- geoloc: map (nullable = true)
| |-- key: string
| |-- value: map (valueContainsNull = true)
| | |-- key: string
| | |-- value: long (valueContainsNull = true)
|-- id: string (nullable = true)
you'll see that geoloc
is represented as map<string,struct<string,long>>
. Correct representation of the structure would use nested Rows
:
Row(
id="14yy74hwogxoyl2l3v",
geoloc=Row(
country=Row(geoname_id=3017382, iso_code="FR", name="France")))
while what you pass is equivalent to:
Row(
geoloc={'country':
{'geoname_id': 3017382, 'iso_code': 'FR', 'name': 'France'}},
id='14yy74hwogxoyl2l3v')
Since creating correct implementation has to cover a number of border case it would make more sense to use intermediate JSON representation and Spark JSON data source.
Upvotes: 2