Reputation: 2500
Using PySpark 1.6.3, I am attempting to convert an RDD to a Dataframe. This is test code running in a Zeppelin notebook. The RDD of interest is rdd_ret
.
>>> from pyspark.sql import Row
>>> rdd_ret.count()
9301
>>> rddofrows = rdd_ret.map(lambda x: Row(**x))
>>> things = rddofrows.take(10000)
>>> len(things)
9301
>>> [type(x) for x in things if type(x) != Row]
[]
>>> [len(x) for x in things if len(x) != 117]
[]
So we see here, we definitely have 9301 rows, all of them Row objects and all of the of the same length. Now I want to convert to a DataFrame:
>>> outdf = rddofrows.toDF(sampleRatio=0.1)
>>> outdf.count()
This throws an error: TypeError: 'NoneType' object is not iterable
; full stack trace at the bottom.
The output Dataframe object is generated, but any operation I try to run on it (.show(); .count(); .filter()) produces the same stack trace at the bottom. I don't understand what could possibly be the NoneType in this case? Sure some of the values within the Row object might be in error, but in order to count or show, you should be iterating through the rows of the Dataframe, which are all there.
What is going on here?
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5665146503764823323.py", line 360, in <module>
exec(code, _zcUserQueryNameSpace)
File "<stdin>", line 1, in <module>
File "/usr/hdp/current/spark-client/python/pyspark/sql/dataframe.py", line 269, in count
return int(self._jdf.count())
File "/usr/hdp/current/spark-client/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/hdp/current/spark-client/python/pyspark/sql/utils.py", line 45, in deco
return f(*a, **kw)
File "/usr/hdp/current/spark-client/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o2282.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 21 in stage 1256.0 failed 4 times, most recent failure: Lost task 21.3 in stage 1256.0 (TID 62913, usg-kov-e1b-slv005.c.pg-us-n-app-053847.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/hdp/current/spark-client/python/pyspark/worker.py", line 111, in main
process()
File "/usr/hdp/current/spark-client/python/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/hdp/current/spark-client/python/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/usr/hdp/current/spark-client/python/pyspark/sql/types.py", line 924, in convert_struct
return tuple(conv(v) for v, conv in zip(obj, converters))
File "/usr/hdp/current/spark-client/python/pyspark/sql/types.py", line 924, in <genexpr>
return tuple(conv(v) for v, conv in zip(obj, converters))
File "/usr/hdp/current/spark-client/python/pyspark/sql/types.py", line 900, in <lambda>
return lambda row: [conv(v) for v in row]
TypeError: 'NoneType' object is not iterable
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1831)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1844)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1928)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:934)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
at org.apache.spark.rdd.RDD.collect(RDD.scala:933)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:166)
at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2087)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1499)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1506)
at org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1516)
at org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1515)
at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2100)
at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1515)
at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
By request, here is what one of the rows looks like:
Row(accountType='individual', added='2018-06-05T01:52:34.257+0000', assignment='null', author='noahmagel', authorCity='null', authorCityCode='null',
authorContinent='North America', authorContinentCode='n-a', authorCountry='United States', authorCountryCode='us', authorCounty='null',
authorCountyCode='null', authorLocation='n-a,us,,,', authorState='null', authorStateCode='null', avatarUrl='https://pbs.twimg.com/profile_images/613069089263718401/P1BWMsFG_normal.jpg',
averageDurationOfVisit=20.0, averageVisits=6.0, backlinks=49850734.0, blogComments=0.0, checked=False, city='null', cityCode='null', continent='North America',
continentCode='n-a', country='United States', countryCode='us', county='null', countyCode='null', date='2017-12-11T10:58:36.000+0000',
displayUrls=[], domain='twitter.com', engagement=0.0, expandedUrls=[], facebookAuthorId='null', facebookComments=0.0, facebookLikes=0.0,
facebookRole='null', facebookShares=0.0, facebookSubtype='null', forumPosts=0.0, forumViews=0.0, fullText='@oli_braun @elonmusk @SpaceX Take my money 💰',
fullname='noah', gender='male', id=167783541878.0, imageMd5s=None, impact=34.0, importanceAmplification=28.0, importanceReach=40.0,
impressions=208.0, influence=502.0, insightsHashtag=[], insightsMentioned=['@elonmusk', '@spacex', '@oli_braun'], instagramCommentCount=0.0,
instagramFollowerCount=0.0, instagramFollowingCount=0.0, instagramInteractionsCount=0.0, instagramLikeCount=0.0, instagramPostCount=0.0,
interest=['Fine arts', 'Business', 'Technology'], language='en', lastAssignmentDate='null', latitude=0.0, lemmatize=['money'],
locationName='null', logoImages=None, longitude=0.0, matchPositions=[], mediaFilter='null', mediaUrls=[], monthlyVisitors=6000000000.0, mozRank=9.6,
originalUrl='http://twitter.com/noahmagel/statuses/940173969935818752', outreach=0.0, pageType='twitter', pagesPerVisit=22.0, percentFemaleVisitors=46.0,
percentMaleVisitors=54.0, priority='null', professions=[], queryId=1999376256.0, queryName='Braun_English', reach=502.0,
replyTo='http://twitter.com/oli_braun/statuses/940171345115144192', resourceId=167783541878.0, resourceType='page', retweetOf='null',
sentiment='neutral', shortUrls=[], snippet='@oli_braun @elonmusk @SpaceX Take my money 💰', starred=False, state='null', stateCode='null', status='null',
subtype='null', tags=[], textlen=44, threadAuthor='oli_braun', threadCreated='null', threadEntryType='reply', threadId='0', threadURL='null',
title='noah (@noahmagel): @oli_braun @elonmusk @Spac ...', trackedLinkClicks=0.0, trackedLinks='null', twitterAuthorId='2246429194',
twitterFollowers=208.0, twitterFollowing=513.0, twitterPostCount=381.0, twitterReplyCount=0.0, twitterRetweets=0.0, twitterRole='null',
twitterVerified=False, updated='2018-06-05T01:52:34.257+0000', url='http://twitter.com/noahmagel/statuses/940173969935818752', wordCount='null')
Upvotes: 2
Views: 1524
Reputation: 151
I reproduced this with the following:
sc = spark.sparkContext
json_rows = ['{"key1": [{"foo": 1}, {"bar": 2}]}',
'{"key2": 1}']
rows = sc.parallelize(json_rows)
df = spark.read.json(rows)
rdd = df.rdd
new_df = spark.createDataFrame(rdd, samplingRatio=1)
new_df.head(2)
Gives me the same error:
File "/usr/hdp/current/spark-client/python/pyspark/sql/types.py", line 900, in <lambda>
return lambda row: [conv(v) for v in row]
TypeError: 'NoneType' object is not iterable
Note that these rows work fine:
json_rows = ['{"key1": [1, 2]}',
'{"key2": 1}']
The problem is when you have a list
or ArrayType
whose elements are StructType
or Row
types. StructType
and Row
types require conversion, see source code:
def _need_converter(dataType):
if isinstance(dataType, StructType):
return True
elif isinstance(dataType, ArrayType):
return _need_converter(dataType.elementType)
Thus it's going to try to convert the elements inside the array now (lambda row: [conv(v) for v in row]
). This will throw an error if the value at that key is ever None for a row (if the key is not on the row or it's explicitly None). Honestly this seems like a bug in the spark code? You'd think it would check if the array is not None, before trying to call conv()
on the elements inside it?
My solution was to flatten the nested rows/structs via map, so the value just becomes a str literal and no conversion is necessary.
import json
from pyspark.sql.types import Row, ArrayType
def flatten(x):
x_dict = x.asDict()
for k, v in x_dict.items():
if isinstance(v, Row):
x_dict[k] = json.dumps(v.asDict())
return x_dict
sc = spark.sparkContext
rows = sc.parallelize(json_rows)
df = spark.read.json(rows)
flat_rdd = df.rdd.map(lambda x: flatten(x))
flat_df = spark.createDataFrame(flat_rdd, samplingRatio=1)
flat_df.head(2)
Note that for my use case all my nested objects were of type Row
and I was fine flattening the whole Row
since it was going to Redshift anyways. YMMV. For my example of the list above, you would have probably checked for type list
and I think it's entirely possible to keep a nested list so long as its elements are literals, so you could just flatten the elements of the list and not the list itself.
Upvotes: 1
Reputation: 1497
Python mapping "**" is used for python dict objects. I use converse Row to dict by x.as_dict() then can be used as **x.as_dict()
Upvotes: 0