Arnoldo Oliva
Arnoldo Oliva

Reputation: 123

TypeError: StructType can not accept object '' in type <class 'int'> pyspark schema

I'm trying to convert a rdd to a dataframe in spark. My rdd was made by a parallelization of a list of integers, and I'm getting stuck when converting to a dataframe. It returns "TypeError: StructType can not accept object 60651 in type <class 'int'>".

Here you can see better:

# Create a schema for the dataframe
schema = StructType([StructField('zipcd', IntegerType(), True)] )

# Convert list to RDD
rdd = sc.parallelize(zip_cd) #solution: close within []. Another problem for the solution, if I do that, the problem 'lenght does not match: 29275 against 1' arises
#rdd=rdd.map(lambda x:int(x))

# Create data frame
zip_cd1 = spark.createDataFrame(rdd,schema)
#print(zip_cd1.schema)
zip_cd1.show()

It returns me the following:

   Py4JJavaError                             Traceback (most recent call last)
<ipython-input-59-13ef33f842e4> in <module>
      9 zip_cd1 = spark.createDataFrame(rdd,schema)
     10 #print(zip_cd1.schema)
---> 11 zip_cd1.show()

~\Anaconda3\envs\pyspark_env\lib\site-packages\pyspark\sql\dataframe.py in show(self, n, truncate, vertical)
    482         """
    483         if isinstance(truncate, bool) and truncate:
--> 484             print(self._jdf.showString(n, 20, vertical))
    485         else:
    486             print(self._jdf.showString(n, int(truncate), vertical))

~\Anaconda3\envs\pyspark_env\lib\site-packages\py4j\java_gateway.py in __call__(self, *args)
   1307 
   1308         answer = self.gateway_client.send_command(command)
-> 1309         return_value = get_return_value(
   1310             answer, self.gateway_client, self.target_id, self.name)
   1311 

~\Anaconda3\envs\pyspark_env\lib\site-packages\pyspark\sql\utils.py in deco(*a, **kw)
    109     def deco(*a, **kw):
    110         try:
--> 111             return f(*a, **kw)
    112         except py4j.protocol.Py4JJavaError as e:
    113             converted = convert_exception(e.java_exception)

~\Anaconda3\envs\pyspark_env\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o900.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 46.0 failed 1 times, most recent failure: Lost task 0.0 in stage 46.0 (TID 1240) (MTYCURB-HOLAP.ACS-JRZ.com executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\52269198\Anaconda3\envs\pyspark_env\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 604, in main
  File "C:\Users\52269198\Anaconda3\envs\pyspark_env\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 596, in process
  File "C:\Users\52269198\Anaconda3\envs\pyspark_env\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\Users\52269198\Anaconda3\envs\pyspark_env\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "C:\Users\52269198\Anaconda3\envs\pyspark_env\lib\site-packages\pyspark\sql\session.py", line 682, in prepare
    verify_func(obj)
  File "C:\Users\52269198\Anaconda3\envs\pyspark_env\lib\site-packages\pyspark\sql\types.py", line 1409, in verify
    verify_value(obj)
  File "C:\Users\52269198\Anaconda3\envs\pyspark_env\lib\site-packages\pyspark\sql\types.py", line 1396, in verify_struct
    raise TypeError(new_msg("StructType can not accept object %r in type %s"
TypeError: StructType can not accept object 60651 in type <class 'int'>

zip_cd is just a list of integers, I don't know why it causes me a lot of trouble:

zip_cd 

[60651,
 60623,
 60077,
 60626,
 60077,
 0,
 60651,
 60644,

Upvotes: 2

Views: 12850

Answers (2)

user2983936
user2983936

Reputation: 43

The createDataFrame function expects a list of lists, where the sublists each represent a row:

zip_cd = [60651, 60623, 60077, 60626, 60077, 0, 60651, 60644]
reformatted_ = map(lambda x: [x], zip_cd)
zip_cd1 = spark.createDataFrame(reformatted_, schema='zipcd int')

Yields the desired Spark Data Frame

+-----+
|zipcd|
+-----+
|60651|
|60623|
|60077|
|60626|
|60077|
|    0|
|60651|
|60644|
+-----+

Upvotes: 1

AdibP
AdibP

Reputation: 2939

Your schema expects the input of a collection with (n,1) shape not (1,n).

zip_cd = [60651, 60623, 60077, 60626, 60077, 0, 60651, 60644]

schema = StructType([StructField('zipcd', IntegerType(), True)])
rdd = sc.parallelize(zip_cd)
rdd = rdd.map(lambda x:[x]) # transform the rdd
 
zip_cd1 = spark.createDataFrame(rdd,schema)
# zip_cd1 = spark.createDataFrame([[x] for x in zip_cd], schema) # list to dataframe directly
zip_cd1.show()

result

+-----+
|zipcd|
+-----+
|60651|
|60623|
|60077|
|60626|
|60077|
|    0|
|60651|
|60644|
+-----+

Upvotes: 6

Related Questions