Hello lad
Hello lad

Reputation: 18790

pyspark: TypeError: IntegerType can not accept object in type <type 'unicode'>

programming with pyspark on a Spark cluster, the data is large and in pieces so can not be loaded into the memory or check the sanity of the data easily

basically it looks like

af.b Current%20events 1 996
af.b Kategorie:Musiek 1 4468
af.b Spesiaal:RecentChangesLinked/Gebruikerbespreking:Freakazoid 1 5209
af.b Spesiaal:RecentChangesLinked/Sir_Arthur_Conan_Doyle 1 5214

wikipedia data:

I read it from aws S3 and then try to construct spark Dataframe with the following python code in pyspark intepreter:

parts = data.map(lambda l: l.split())
wikis = parts.map(lambda p: (p[0], p[1],p[2],p[3]))


fields = [StructField("project", StringType(), True),
StructField("title", StringType(), True),
StructField("count", IntegerType(), True),
StructField("byte_size", StringType(), True)] 

schema = StructType(fields) 

df = sqlContext.createDataFrame(wikis, schema)

all look fine, only createDataFrame gives me error

Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/sql/context.py", line 404, in   createDataFrame
rdd, schema = self._createFromRDD(data, schema, samplingRatio)
File "/usr/lib/spark/python/pyspark/sql/context.py", line 298, in _createFromRDD
_verify_type(row, schema)
File "/usr/lib/spark/python/pyspark/sql/types.py", line 1152, in _verify_type
_verify_type(v, f.dataType)
File "/usr/lib/spark/python/pyspark/sql/types.py", line 1136, in _verify_type
raise TypeError("%s can not accept object in type %s" % (dataType, type(obj)))
TypeError: IntegerType can not accept object in type <type 'unicode'>

why I can not set the third column which should be count to IntegerType ? How can I solve this ?

Upvotes: 15

Views: 43549

Answers (2)

Giovanni Bruner
Giovanni Bruner

Reputation: 122

With apache 2.0 you can let spark infer the schema of your data. Overall you'll need to cast in your parser function as argued above:

"When schema is None, it will try to infer the schema (column names and types) from data, which should be an RDD of Row, or namedtuple, or dict."

Upvotes: -1

zero323
zero323

Reputation: 330063

As noted by ccheneson you pass wrong types.

Assuming you data looks like this:

data = sc.parallelize(["af.b Current%20events 1 996"])

After the first map you get RDD[List[String]]:

parts = data.map(lambda l: l.split())
parts.first()
## ['af.b', 'Current%20events', '1', '996']

The second map converts it to tuple (String, String, String, String):

wikis = parts.map(lambda p: (p[0], p[1], p[2],p[3]))
wikis.first()
## ('af.b', 'Current%20events', '1', '996')

Your schema states that 3rd columns is an integer:

[f.dataType for f in schema.fields]
## [StringType, StringType, IntegerType, StringType]

Schema is used most to avoid a full table scan to infer types and doesn't perform any type casting.

You can either cast your data during last map:

wikis = parts.map(lambda p: (p[0], p[1], int(p[2]), p[3]))

Or define count as a StringType and cast column

fields[2] = StructField("count", StringType(), True)
schema = StructType(fields) 

wikis.toDF(schema).withColumn("cnt", col("count").cast("integer")).drop("count")

On a side note count is reserved word in SQL and shouldn't be used as a column name. In Spark it will work as expected in some contexts and fail in another.

Upvotes: 10

Related Questions