Reputation: 2938
I'm creating a spark session (spark version 2.2.1) like below
SparkS = SparkSession.builder\
.appName("Test")\
.master("local[*]")\
.getOrCreate()
then creating sparkcontext like below
raw_data = SparkS\
.sparkContext\
.textFile("C:\\Users\\...\\RawData\\nasdaq.csv")
for verification purposes I'm printing the data using:
print(raw_data.take(3))
and output is
['43084,6871.549805,6945.819824,6871.450195,6936.580078,6936.580078,3510420000', '43087,6980.399902,7003.890137,6975.540039,6994.759766,6994.759766,2144360000', '43088,6991.25,6995.879883,6951.490234,6963.850098,6963.850098,2071060000']
Now I'm converting the RDD into a datafrme by defining schema like below:
schema = StructType().add("date", StringType())\
.add("open", StringType())\
.add("high", StringType())\
.add("low", StringType())\
.add("close", StringType())\
.add("adj_close", StringType())\
.add("volume", StringType())
geioIP = SparkS.createDataFrame(raw_data,schema)
print(geioIP)
The output is:
DataFrame[date: string, open: string, high: string, low: string, close: string, adj_close: string, volume: string]
So far so good but problem is when i call geioIP.show(2)
, it gives me a error
18/01/23 12:58:48 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\spark-2.2.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 177, in main
File "C:\spark-2.2.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 172, in process
File "C:\spark-2.2.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 268, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "C:\Users\rajnish.kumar\AppData\Local\Programs\Python\Python36\lib\site-packages\pyspark\sql\session.py", line 520, in prepare
verify_func(obj, schema)
File "C:\spark-2.2.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\types.py", line 1371, in _verify_type
raise TypeError("StructType can not accept object %r in type %s" % (obj, type(obj)))
TypeError: StructType can not accept object '43084,6871.549805,6945.819824,6871.450195,6936.580078,6936.580078,3510420000' in type <class 'str'>
After going through this link, what I did was convert all the csv data into text format but I'm still getting above issue.
Upvotes: 4
Views: 10234
Reputation: 28392
The problem is that each row in the RDD is a single string (i.e. one column) and your schema contains 7 columns. The RDD is not actually converted into a dataframe until you use an action (like show
) which is why it does not crash immediately.
Since you want to have the data in a dataframe, the easiest solution would be to read the data as a dataframe in the beginning:
geioIP = SparkS.read.csv("C:\\Users\\...\\RawData\\nasdaq.csv", schema=schema)
Or if you want to continue to use RDD and createDataFrame
, you could use the split
function (possibly with strip
if you have spaces).
raw_data = raw_data.map(lambda x: [c.strip() for c in x.split(',')])
geioIP = SparkS.createDataFrame(raw_data,schema)
Upvotes: 2
Reputation: 2938
Hi thankyou to @ Shaido for pointing out the most basic thing regarding RDD that "each row in the RDD is a single string (i.e. one column) and your schema contains 7 columns." and talking help form this post i was able to solve above issue
before using raw_data directly in
geioIP = SparkS.createDataFrame(raw_data,schema)
i need to create a list of RDD which i did like this
rawdata = raw_data.map(lambda x : x.split(","))
now calling
geioIP = SparkS.createDataFrame(rawdata,schema)
geioIP.show(2)
yields
+-----+-----------+-----------+-----------+-----------+-----------+----------+
| date| open| high| low| close| adj_close| volume|
+-----+-----------+-----------+-----------+-----------+-----------+----------+
|43084|6871.549805|6945.819824|6871.450195|6936.580078|6936.580078|3510420000|
|43087|6980.399902|7003.890137|6975.540039|6994.759766|6994.759766|2144360000|
+-----+-----------+-----------+-----------+-----------+-----------+----------+
only showing top 2 rows
Upvotes: 1