gold_cy
gold_cy

Reputation: 14236

TypeError when converting Pandas to Spark

So I have looked up this question on here but previous solutions have not worked for me. I have a DataFrame in this format

mdf.head()
    dbn       boro       bus
0   17K548  Brooklyn    B41, B43, B44-SBS, B45, B48, B49, B69
1   09X543  Bronx       Bx13, Bx15, Bx17, Bx21, Bx35, Bx4, Bx41, Bx4A,...
4   28Q680  Queens      Q25, Q46, Q65
6   14K474  Brooklyn    B24, B43, B48, B60, Q54, Q59

There are a couple more columns but I have excluded them (subway lines and test scores). When I try to convert this DataFrame into a Spark DataFrame I am given an error which is this.

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-30-1721be5c2987> in <module>()
----> 1 sparkdf = sqlc.createDataFrame(mdf)

/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in createDataFrame(self, data, schema, samplingRatio)
    423             rdd, schema = self._createFromRDD(data, schema, samplingRatio)
    424         else:
--> 425             rdd, schema = self._createFromLocal(data, schema)
    426         jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
    427         jdf = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())

/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in _createFromLocal(self, data, schema)
    339 
    340         if schema is None or isinstance(schema, (list, tuple)):
--> 341             struct = self._inferSchemaFromList(data)
    342             if isinstance(schema, (list, tuple)):
    343                 for i, name in enumerate(schema):

/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in _inferSchemaFromList(self, data)
    239             warnings.warn("inferring schema from dict is deprecated,"
    240                           "please use pyspark.sql.Row instead")
--> 241         schema = reduce(_merge_type, map(_infer_schema, data))
    242         if _has_nulltype(schema):
    243             raise ValueError("Some of types cannot be determined after inferring")

/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/types.pyc in _merge_type(a, b)
    860         nfs = dict((f.name, f.dataType) for f in b.fields)
    861         fields = [StructField(f.name, _merge_type(f.dataType, nfs.get(f.name, NullType())))
--> 862                   for f in a.fields]
    863         names = set([f.name for f in fields])
    864         for n in nfs:

/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/types.pyc in _merge_type(a, b)
    854     elif type(a) is not type(b):
    855         # TODO: type cast (such as int -> long)
--> 856         raise TypeError("Can not merge type %s and %s" % (type(a), type(b)))
    857 
    858     # same type

TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>

From what I have read this might be a problem with the headers being treated as data. It is my understanding you can't remove the headers from a DataFrame so how would I proceed with solving this error and converting this DataFrame into a Spark one?

Edit: Here is the code for how I created the Pandas DF and worked my way around the problem.

sqlc = SQLContext(sc)
df = pd.DataFrame(pd.read_csv('hsdir.csv', encoding = 'utf_8_sig'))
df = df[['dbn', 'boro', 'bus', 'subway', 'total_students']]
df1 = pd.DataFrame(pd.read_csv('sat_r.csv', encoding = 'utf_8_sig'))
df1 = df1.rename(columns = {'Num of SAT Test Takers': 'num_test_takers', 'SAT Critical Reading Avg. Score': 'read_avg', 'SAT Math Avg. Score' : 'math_avg', 'SAT Writing Avg. Score' : 'write_avg'})
mdf = pd.merge(df, df1, left_on = 'dbn', right_on = 'DBN', how = 'left')
mdf = mdf[pd.notnull(mdf['DBN'])]
mdf.to_csv('merged.csv', encoding = 'utf-8')
ndf = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("merged.csv")

The last line of this code, loading it from my local machine ended up allowing me to convert the CSV properly to a Data Frame however my question still remains. Why did it not work in the first place?

Upvotes: 3

Views: 12815

Answers (4)

Itachi
Itachi

Reputation: 3005

The problem here is pandas default np.nan(Not a number) value for empty string, which creates a confusion in Schema while converting to spark.df.

Basic approach is convert np.nan to None, which will enable it to work

Unfortunately, pandas does not let you fillna with None. As, np.nan doesn't follow self equality condition, you can do this nifty trick.

new_series = new_series.apply(lambda x: None if x != x else x)

Then, display(sqlContext.createDataFrame(new_df_1)) would work fine

Upvotes: 0

Ankit Kumar Namdeo
Ankit Kumar Namdeo

Reputation: 1464

You can try this as well:

def create_spark_dataframe(file_name):
   """
   will return the spark dataframe input pandas dataframe
   """
   pandas_data_frame = pd.read_csv(file_name, converters= {"PRODUCT": str})
   for col in pandas_data_frame.columns:
   if ((pandas_data_frame[col].dtypes != np.int64) & 
      (pandas_data_frame[col].dtypes != np.float64)):
    pandas_data_frame[col] = pandas_data_frame[col].fillna('')

   spark_data_frame = sqlContext.createDataFrame(pandas_data_frame)
   return spark_data_frame

This will solve your problem.

Upvotes: 2

kmader
kmader

Reputation: 1369

I had the same issue and was able to track it down to a single entry which had a value of length 0 (or empty). The _inferScheme command runs on each row of the dataframe and determines the types. By default assumption is that the empty value is a Double while the other is a String. These two types cannot be merged by the _merge_type command. The issue has been filed https://issues.apache.org/jira/browse/SPARK-18178, but the best way around is probably supplying a schema to the createDataFrame command.

The code below reproduces the problem in PySpark 2.0

import pandas as pd
from io import StringIO
test_df = pd.read_csv(StringIO(',Scan Options\n15,SAT2\n16,\n'))
sqlContext.createDataFrame(test_df).registerTempTable('Test')
o_qry = sqlContext.sql("SELECT * FROM Test LIMIT 1")
o_qry.first()

Upvotes: 3

user4601931
user4601931

Reputation: 5304

You could use reflection to infer the schema from an RDD of Row objects, e.g.,

from pyspark.sql import Row
mdfRows = mdf.map(lambda p: Row(dbn=p[0], boro=p[1], bus=p[2]))
dfOut = sqlContext.createDataFrame(mdfRows)

Does that achieve the desired result?

Upvotes: 3

Related Questions