Mehdi Ben Hamida
Mehdi Ben Hamida

Reputation: 1051

PCA on PySpark irregular execution

I'm working on PCA over a csv filewith PySpark. I'm getting some strange behavior; my code sometimes works perfectly but sometimes returns this error:

 File "C:/spark/spark-2.1.0-bin-hadoop2.7/bin/pca_final2.py", line 25, in <module>
columns = (fileObj.first()).split(';')
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1361, in first
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1343, in take
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\context.py", line 965, in runJob
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py", line 1133, in __call__
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\utils.py", line 63, in deco
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.net.SocketException: Connection reset by peer: socket write error

here is my code:

#########################! importing libraries !########################
from __future__ import print_function
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.ml.feature import PCA, VectorAssembler
from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.mllib.feature import Normalizer
import timeit
########################! main script !#################################
sc = SparkContext("local", "pca-app")
sqlContext = SQLContext(sc)
if __name__ == "__main__":
    spark = SparkSession\
        .builder\
        .appName("PCAExample")\
        .getOrCreate()  
    start=timeit.default_timer() 
    fileObj = sc.textFile('bigiris.csv')
    data = fileObj.map(lambda line: [float(k) for k in line.split(';')])
    columns = (fileObj.first()).split(';')
    df = spark.createDataFrame(data, columns)
    df.show()
    vecAssembler = VectorAssembler(inputCols=columns, outputCol="features")
    pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
    pipeline = Pipeline(stages=[vecAssembler, pca])
    model = pipeline.fit(df)
    result = model.transform(df).select("pcaFeatures")
    stop=timeit.default_timer()
    result.show(truncate=False)
    time=stop-start
    print ("this operation takes ", (time), " seconds")
    spark.stop()

why I'm getting this irregular execution? and what should I add to fix this problem.

Upvotes: 4

Views: 403

Answers (2)

Rakesh Kumar
Rakesh Kumar

Reputation: 4420

Here error notifies at line columns = (fileObj.first()).split(';'). Basically you are trying to split first line of fileObj on basis of (;). Here sequence of operation performed is wrong as line is already converted to list in previous step.

Correct sequence of operation is this (columns line should be before data line):-

fileObj = sc.textFile('bigiris.csv')
columns = (fileObj.first()).split(';')
data = fileObj.map(lambda line: [float(k) for k in line.split(';')])
df = spark.createDataFrame(data, columns)

Reason for Error :- line (data =) have fileObj.map and line.split(';'). Which already splits each line of csv with respect to (;)

If you have headers as text in csv and want to remove from data then follow Jaco answer filter(lambda x: x != header).

Upvotes: 1

Alex
Alex

Reputation: 21766

You are not filtering out the header when creating your data frame. Assuming your column names are strings, this will result in an error as the column names can't be converted to float values. See below the modified section of your script that uses a filter to remove the header.

fileObj = sc.textFile('e:/iris.data.txt')
header = fileObj.first()
data = fileObj.filter(lambda x: x != header).map(lambda line: [float(k) for k in line.split(';')])
columns = header.split(';')
df = spark.createDataFrame(data, columns)
df.show()

Upvotes: 3

Related Questions