Reputation: 14236
So I have looked up this question on here but previous solutions have not worked for me. I have a DataFrame in this format
mdf.head()
dbn boro bus
0 17K548 Brooklyn B41, B43, B44-SBS, B45, B48, B49, B69
1 09X543 Bronx Bx13, Bx15, Bx17, Bx21, Bx35, Bx4, Bx41, Bx4A,...
4 28Q680 Queens Q25, Q46, Q65
6 14K474 Brooklyn B24, B43, B48, B60, Q54, Q59
There are a couple more columns but I have excluded them (subway lines and test scores). When I try to convert this DataFrame into a Spark DataFrame I am given an error which is this.
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-30-1721be5c2987> in <module>()
----> 1 sparkdf = sqlc.createDataFrame(mdf)
/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in createDataFrame(self, data, schema, samplingRatio)
423 rdd, schema = self._createFromRDD(data, schema, samplingRatio)
424 else:
--> 425 rdd, schema = self._createFromLocal(data, schema)
426 jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
427 jdf = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in _createFromLocal(self, data, schema)
339
340 if schema is None or isinstance(schema, (list, tuple)):
--> 341 struct = self._inferSchemaFromList(data)
342 if isinstance(schema, (list, tuple)):
343 for i, name in enumerate(schema):
/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in _inferSchemaFromList(self, data)
239 warnings.warn("inferring schema from dict is deprecated,"
240 "please use pyspark.sql.Row instead")
--> 241 schema = reduce(_merge_type, map(_infer_schema, data))
242 if _has_nulltype(schema):
243 raise ValueError("Some of types cannot be determined after inferring")
/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/types.pyc in _merge_type(a, b)
860 nfs = dict((f.name, f.dataType) for f in b.fields)
861 fields = [StructField(f.name, _merge_type(f.dataType, nfs.get(f.name, NullType())))
--> 862 for f in a.fields]
863 names = set([f.name for f in fields])
864 for n in nfs:
/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/types.pyc in _merge_type(a, b)
854 elif type(a) is not type(b):
855 # TODO: type cast (such as int -> long)
--> 856 raise TypeError("Can not merge type %s and %s" % (type(a), type(b)))
857
858 # same type
TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
From what I have read this might be a problem with the headers being treated as data. It is my understanding you can't remove the headers from a DataFrame so how would I proceed with solving this error and converting this DataFrame into a Spark one?
Edit: Here is the code for how I created the Pandas DF and worked my way around the problem.
sqlc = SQLContext(sc)
df = pd.DataFrame(pd.read_csv('hsdir.csv', encoding = 'utf_8_sig'))
df = df[['dbn', 'boro', 'bus', 'subway', 'total_students']]
df1 = pd.DataFrame(pd.read_csv('sat_r.csv', encoding = 'utf_8_sig'))
df1 = df1.rename(columns = {'Num of SAT Test Takers': 'num_test_takers', 'SAT Critical Reading Avg. Score': 'read_avg', 'SAT Math Avg. Score' : 'math_avg', 'SAT Writing Avg. Score' : 'write_avg'})
mdf = pd.merge(df, df1, left_on = 'dbn', right_on = 'DBN', how = 'left')
mdf = mdf[pd.notnull(mdf['DBN'])]
mdf.to_csv('merged.csv', encoding = 'utf-8')
ndf = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("merged.csv")
The last line of this code, loading it from my local machine ended up allowing me to convert the CSV properly to a Data Frame however my question still remains. Why did it not work in the first place?
Upvotes: 3
Views: 12815
Reputation: 3005
The problem here is pandas default np.nan
(Not a number) value for empty string, which creates a confusion in Schema while converting to spark.df.
Basic approach is convert np.nan to None, which will enable it to work
Unfortunately, pandas does not let you fillna with None. As, np.nan doesn't follow self equality condition, you can do this nifty trick.
new_series = new_series.apply(lambda x: None if x != x else x)
Then, display(sqlContext.createDataFrame(new_df_1))
would work fine
Upvotes: 0
Reputation: 1464
You can try this as well:
def create_spark_dataframe(file_name):
"""
will return the spark dataframe input pandas dataframe
"""
pandas_data_frame = pd.read_csv(file_name, converters= {"PRODUCT": str})
for col in pandas_data_frame.columns:
if ((pandas_data_frame[col].dtypes != np.int64) &
(pandas_data_frame[col].dtypes != np.float64)):
pandas_data_frame[col] = pandas_data_frame[col].fillna('')
spark_data_frame = sqlContext.createDataFrame(pandas_data_frame)
return spark_data_frame
This will solve your problem.
Upvotes: 2
Reputation: 1369
I had the same issue and was able to track it down to a single entry which had a value of length 0 (or empty). The _inferScheme
command runs on each row of the dataframe and determines the types. By default assumption is that the empty value is a Double while the other is a String. These two types cannot be merged by the _merge_type
command. The issue has been filed https://issues.apache.org/jira/browse/SPARK-18178, but the best way around is probably supplying a schema to the createDataFrame
command.
The code below reproduces the problem in PySpark 2.0
import pandas as pd
from io import StringIO
test_df = pd.read_csv(StringIO(',Scan Options\n15,SAT2\n16,\n'))
sqlContext.createDataFrame(test_df).registerTempTable('Test')
o_qry = sqlContext.sql("SELECT * FROM Test LIMIT 1")
o_qry.first()
Upvotes: 3
Reputation: 5304
You could use reflection to infer the schema from an RDD of Row
objects, e.g.,
from pyspark.sql import Row
mdfRows = mdf.map(lambda p: Row(dbn=p[0], boro=p[1], bus=p[2]))
dfOut = sqlContext.createDataFrame(mdfRows)
Does that achieve the desired result?
Upvotes: 3