ThirdEye
ThirdEye

Reputation: 443

Spark Sql: TypeError("StructType can not accept object in type %s" % type(obj))

I am currently pulling data from SQL Server using PyODBC and trying to insert into a table in Hive in a Near Real Time (NRT) manner.

I got a single row from source and converted into List[Strings] and creating schema programatically but while creating a DataFrame, Spark is throwing StructType error.

>>> cnxn = pyodbc.connect(con_string)
>>> aj = cnxn.cursor()
>>>
>>> aj.execute("select * from tjob")
<pyodbc.Cursor object at 0x257b2d0>

>>> row = aj.fetchone()

>>> row
(1127, u'', u'8196660', u'', u'', 0, u'', u'', None, 35, None, 0, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, u'', 0, None, None)
>>> rowstr = map(str,row)
>>> rowstr
['1127', '', '8196660', '', '', '0', '', '', 'None', '35', 'None', '0', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', '', '0', 'None', 'None']

>>> schemaString = " ".join([row.column_name for row in aj.columns(table='tjob')])

>>> schemaString
u'ID ExternalID Name Description Notes Type Lot SubLot ParentJobID ProductID PlannedStartDateTime PlannedDurationSeconds Capture01 Capture02 Capture03 Capture04 Capture05 Capture06 Capture07 Capture08 Capture09 Capture10 Capture11 Capture12 Capture13 Capture14 Capture15 Capture16 Capture17 Capture18 Capture19 Capture20 User UserState ModifiedDateTime UploadedDateTime'

>>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
>>> schema = StructType(fields)

>>> [f.dataType for f in schema.fields]
[StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType]

>>> myrdd = sc.parallelize(rowstr)

>>> myrdd.collect()
['1127', '', '8196660', '', '', '0', '', '', 'None', '35', 'None', '0', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', '', '0', 'None', 'None']

>>> schemaPeople = sqlContext.createDataFrame(myrdd, schema)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/apps/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/sql/context.py", line 404, in createDataFrame
    rdd, schema = self._createFromRDD(data, schema, samplingRatio)
  File "/apps/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/sql/context.py", line 298, in _createFromRDD
    _verify_type(row, schema)
  File "/apps/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/sql/types.py", line 1132, in _verify_type
    raise TypeError("StructType can not accept object in type %s" % type(obj))
TypeError: StructType can not accept object in type <type 'str'>

Upvotes: 17

Views: 79129

Answers (2)

lanreAlabede
lanreAlabede

Reputation: 1

I received similar error now!

TypeError: StructType can not accept object '_id' in type <class 'str'>

and this is how I resolved it.

I am working with heavily nested json file for scheduling , json file is composed of list of dictionary of list etc.

e.g. ['1127', {time: '_id', '8196660', '', '', '0', '', '', 'None' ...}, {busstops: {_id, name} ]

For me _id was repeated many times within other dictionary and I resolved it by specifying the dictionary key.

kl= spark.createDataFrame(obj_day, schema=test()) #: I get the error

but I resolved it with

kl= spark.createDataFrame(obj_day["busstops"], schema=test())

Upvotes: -2

Shyamendra Solanki
Shyamendra Solanki

Reputation: 8851

here is the reason for error message:

>>> rowstr
['1127', '', '8196660', '', '', '0', '', '', 'None' ... ]   
#rowstr is a list of str

>>> myrdd = sc.parallelize(rowstr)
#myrdd is a rdd of str

>>> schema = StructType(fields)
#schema is StructType([StringType, StringType, ....])

>>> schemaPeople = sqlContext.createDataFrame(myrdd, schema)
#myrdd should have been RDD([StringType, StringType,...]) but is RDD(str)

to fix that, make the RDD of proper type:

>>> myrdd = sc.parallelize([rowstr])

Upvotes: 33

Related Questions