Reputation: 2325
I have Spark 1.3 set up in VirtualBox Ubuntu 14 32 bit VM. I have taken csv file into Spark DataFrame and am attempting some operations that are giving me error messages I can't troubleshoot.
The pySpark code is below
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from datetime import *
from dateutil.parser import parse
sqlContext = SQLContext(sc)
elevFile = sc.textFile('file:////sharefolder/Jones Lake.csv')
header = elevFile.first()
schemaString = header.replace('"','')
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(',')]
fields[0].dataType = IntegerType()
fields[1].dataType = TimestampType()
fields[2].dataType = FloatType()
schema = StructType(fields)
elevHeader = elevFile.filter(lambda l: "Hour" in l)
elevHeader.collect()
elevNoHeader = elevFile.subtract(elevHeader)
print elevNoHeader.take(5)
elev_df = (elevNoHeader.map(lambda k: k.split(","))
.map(lambda p: (int(p[0]), parse(p[1]), float(p[2])))
.toDF(schema))
Everything works fine up to this point. It prints out top 5 rows of new DataFrame no problem:
print elev_df.head(5)
[Row(Hour=6, Date=datetime.datetime(1989, 9, 19, 0, 0), Value=641.6890258789062), Row(Hour=20, Date=datetime.datetime(1992, 4, 30, 0, 0), Value=633.7100219726562), Row(Hour=10, Date=datetime.datetime(1987, 7, 26, 0, 0), Value=638.6920166015625), Row(Hour=1, Date=datetime.datetime(1991, 2, 26, 0, 0), Value=634.2100219726562), Row(Hour=2, Date=datetime.datetime(1984, 7, 28, 0, 0), Value=639.8779907226562)]
But when I try to do simple group by and count, I am getting errors I can't troubleshoot.
elev_df.groupBy("Hour").count().show()
Gives error (top few lines of error are below).
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-209-6533c596fac9> in <module>()
----> 1 elev_df.groupBy("Hour").count().show()
/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/sql/dataframe.py in show(self, n)
271 5 Bob
272 """
--> 273 print self._jdf.showString(n).encode('utf8', 'ignore')
274
275 def __repr__(self):
/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:
/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(
Any ideas on troubleshooting this further?
Upvotes: 0
Views: 1900
Reputation: 8395
It seems like ur CSV has some blank value. I can see your are replacing the blank values but groupby is not accepting that I believe. Handle ur csv blank values using spark dataframe as easy way-
fillna(value, subset=None)
Replace null values, alias for na.fill(). DataFrame.fillna() and DataFrameNaFunctions.fill() are aliases of each other.
Parameters:
value – int, long, float, string, or dict. Value to replace null values with. If the value is a dict, then subset is ignored and value must be a mapping from column name (string) to replacement value. The replacement value must be an int, long, float, or string.
subset – optional list of column names to consider. Columns specified in subset that do not have matching data type are ignored. For example, if value is a string, and subset contains a non-string column, then the non-string column is simply ignored.
Upvotes: 1