Reputation: 241
I am new to spark & pyspark.
I am reading a small csv file (~40k) into a dataframe.
from pyspark.sql import functions as F
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/tmp/sm.csv')
df = df.withColumn('verified', F.when(df['verified'] == 'Y', 1).otherwise(0))
df2 = df.map(lambda x: Row(label=float(x[0]), features=Vectors.dense(x[1:]))).toDF()
I get some weird error that does not occur every single time, but does happen pretty regularly
>>> df2.show(1)
+--------------------+---------+
| features| label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row
>>> df2.count()
41999
>>> df2.show(1)
+--------------------+---------+
| features| label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row
>>> df2.count()
41999
>>> df2.show(1)
Traceback (most recent call last):
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 157, in manager
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 61, in worker
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 136, in main
if read_int(infile) == SpecialLengths.END_OF_STREAM:
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", line 545, in read_int
raise EOFError
EOFError
+--------------------+---------+
| features| label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row
Once that EOFError has been raised, I will not see it again until I do something that requires interacting with the spark server
When I call df2.count() it shows that [Stage xxx] prompt which is what I mean by it going to the spark server. Anything that triggers that seems to eventually end up giving the EOFError again when I do something with df2.
It does not seem to happen with df (vs. df2) so seems like it must be something happening with the df.map() line.
Upvotes: 24
Views: 4566
Reputation: 33
I believe you are running Spark 2.x and above. Below code should create your dataframe from csv:
df = spark.read.format("csv").option("header", "true").load("csvfile.csv")
then you can have below code:
df = df.withColumn('verified', F.when(df['verified'] == 'Y', 1).otherwise(0))
and then you can create df2 without Row and toDF()
Let me know if this works or if you are using Spark 1.6...thanks.
Upvotes: 1
Reputation: 830
Can you please try to do map after converting dataframe into rdd. You are applying map function on a dataframe and then again creating a dataframe from that.Syntax would be like
df.rdd.map().toDF()
Please let me know if it works. Thanks.
Upvotes: 3