Reputation: 18790
programming with pyspark on a Spark cluster, the data is large and in pieces so can not be loaded into the memory or check the sanity of the data easily
basically it looks like
af.b Current%20events 1 996
af.b Kategorie:Musiek 1 4468
af.b Spesiaal:RecentChangesLinked/Gebruikerbespreking:Freakazoid 1 5209
af.b Spesiaal:RecentChangesLinked/Sir_Arthur_Conan_Doyle 1 5214
wikipedia data:
I read it from aws S3 and then try to construct spark Dataframe with the following python code in pyspark intepreter:
parts = data.map(lambda l: l.split())
wikis = parts.map(lambda p: (p[0], p[1],p[2],p[3]))
fields = [StructField("project", StringType(), True),
StructField("title", StringType(), True),
StructField("count", IntegerType(), True),
StructField("byte_size", StringType(), True)]
schema = StructType(fields)
df = sqlContext.createDataFrame(wikis, schema)
all look fine, only createDataFrame gives me error
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/sql/context.py", line 404, in createDataFrame
rdd, schema = self._createFromRDD(data, schema, samplingRatio)
File "/usr/lib/spark/python/pyspark/sql/context.py", line 298, in _createFromRDD
_verify_type(row, schema)
File "/usr/lib/spark/python/pyspark/sql/types.py", line 1152, in _verify_type
_verify_type(v, f.dataType)
File "/usr/lib/spark/python/pyspark/sql/types.py", line 1136, in _verify_type
raise TypeError("%s can not accept object in type %s" % (dataType, type(obj)))
TypeError: IntegerType can not accept object in type <type 'unicode'>
why I can not set the third column which should be count to IntegerType ? How can I solve this ?
Upvotes: 15
Views: 43549
Reputation: 122
With apache 2.0 you can let spark infer the schema of your data. Overall you'll need to cast in your parser function as argued above:
"When schema is None, it will try to infer the schema (column names and types) from data, which should be an RDD of Row, or namedtuple, or dict."
Upvotes: -1
Reputation: 330063
As noted by ccheneson you pass wrong types.
Assuming you data
looks like this:
data = sc.parallelize(["af.b Current%20events 1 996"])
After the first map you get RDD[List[String]]
:
parts = data.map(lambda l: l.split())
parts.first()
## ['af.b', 'Current%20events', '1', '996']
The second map converts it to tuple (String, String, String, String)
:
wikis = parts.map(lambda p: (p[0], p[1], p[2],p[3]))
wikis.first()
## ('af.b', 'Current%20events', '1', '996')
Your schema
states that 3rd columns is an integer:
[f.dataType for f in schema.fields]
## [StringType, StringType, IntegerType, StringType]
Schema is used most to avoid a full table scan to infer types and doesn't perform any type casting.
You can either cast your data during last map:
wikis = parts.map(lambda p: (p[0], p[1], int(p[2]), p[3]))
Or define count
as a StringType
and cast column
fields[2] = StructField("count", StringType(), True)
schema = StructType(fields)
wikis.toDF(schema).withColumn("cnt", col("count").cast("integer")).drop("count")
On a side note count
is reserved word in SQL and shouldn't be used as a column name. In Spark it will work as expected in some contexts and fail in another.
Upvotes: 10