Reputation: 1051
I'm working on PCA over a csv filewith PySpark. I'm getting some strange behavior; my code sometimes works perfectly but sometimes returns this error:
File "C:/spark/spark-2.1.0-bin-hadoop2.7/bin/pca_final2.py", line 25, in <module>
columns = (fileObj.first()).split(';')
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1361, in first
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1343, in take
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\context.py", line 965, in runJob
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py", line 1133, in __call__
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\utils.py", line 63, in deco
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.net.SocketException: Connection reset by peer: socket write error
here is my code:
#########################! importing libraries !########################
from __future__ import print_function
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.ml.feature import PCA, VectorAssembler
from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.mllib.feature import Normalizer
import timeit
########################! main script !#################################
sc = SparkContext("local", "pca-app")
sqlContext = SQLContext(sc)
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("PCAExample")\
.getOrCreate()
start=timeit.default_timer()
fileObj = sc.textFile('bigiris.csv')
data = fileObj.map(lambda line: [float(k) for k in line.split(';')])
columns = (fileObj.first()).split(';')
df = spark.createDataFrame(data, columns)
df.show()
vecAssembler = VectorAssembler(inputCols=columns, outputCol="features")
pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
pipeline = Pipeline(stages=[vecAssembler, pca])
model = pipeline.fit(df)
result = model.transform(df).select("pcaFeatures")
stop=timeit.default_timer()
result.show(truncate=False)
time=stop-start
print ("this operation takes ", (time), " seconds")
spark.stop()
why I'm getting this irregular execution? and what should I add to fix this problem.
Upvotes: 4
Views: 403
Reputation: 4420
Here error notifies at line columns = (fileObj.first()).split(';')
. Basically you are trying to split
first line of fileObj
on basis of (;)
. Here sequence of operation performed is wrong as line is already converted to list in previous step.
Correct sequence of operation is this (columns line should be before data line
):-
fileObj = sc.textFile('bigiris.csv')
columns = (fileObj.first()).split(';')
data = fileObj.map(lambda line: [float(k) for k in line.split(';')])
df = spark.createDataFrame(data, columns)
Reason for Error :- line (data =) have fileObj.map and line.split(';'). Which already splits each line of csv with respect to (;)
If you have headers as text in csv and want to remove from data then follow Jaco answer filter(lambda x: x != header)
.
Upvotes: 1
Reputation: 21766
You are not filtering out the header when creating your data
frame. Assuming your column names are strings, this will result in an error as the column names can't be converted to float values. See below the modified section of your script that uses a filter
to remove the header.
fileObj = sc.textFile('e:/iris.data.txt')
header = fileObj.first()
data = fileObj.filter(lambda x: x != header).map(lambda line: [float(k) for k in line.split(';')])
columns = header.split(';')
df = spark.createDataFrame(data, columns)
df.show()
Upvotes: 3