dzakyputra
dzakyputra

Reputation: 682

Cannot convert RDD to DataFrame (RDD has millions of rows)

I'm using Apache Spark 1.6.2

I have a .csv data, it contains about 8 million rows and I want to convert it to DataFrame

But I have to convert it to RDD first to do mapping to get the data (column) that I want

Mapping the RDD works fine, but when it comes to converting RDD to DataFrame, Spark throws an error

Traceback (most recent call last):
  File "C:/Users/Dzaky/Project/TJ-source/source/201512/final1.py", line 38, in <module>
    result_iso = input_iso.map(extract_iso).toDF()
  File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 64, in toDF
  File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 423, in createDataFrame
  File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 310, in _createFromRDD
  File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 254, in _inferSchema
  File "c:\spark\python\lib\pyspark.zip\pyspark\rdd.py", line 1315, in first
  File "c:\spark\python\lib\pyspark.zip\pyspark\rdd.py", line 1297, in take
  File "c:\spark\python\lib\pyspark.zip\pyspark\context.py", line 939, in runJob
  File "c:\spark\python\lib\py4j-0.9-src.zip\py4j\java_gateway.py", line 813, in __call__
  File "c:\spark\python\lib\pyspark.zip\pyspark\sql\utils.py", line 45, in deco
  File "c:\spark\python\lib\py4j-0.9-src.zip\py4j\protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.net.SocketException: Connection reset by peer: socket write error

These are my code :

def extract_iso(line):
    fields = line.split(',')
    return [fields[-2], fields[1]]

input_iso = sc.textFile("data.csv")
result_iso = input_iso.map(extract_iso).toDF()

data.csv has more than 8 million rows, but when I substract the rows until it has only < 500 rows, the program works fine

I don't know if Spark has row limitation or something, is there any ways so I can convert my RDD ?

Or is there any other ways we can map the DataFrame just like we map the RDD ?

Additional Information :

the data is messy, total columns in each row is oftenly different from one to another, that's why i need to map it first. But, the data that I want is always at the exact same index [1] and [-2] (the second column, and the second last column), the total column between those columns are different from row to row

Thank you very much for the answer :)

Upvotes: 4

Views: 1439

Answers (1)

Mariusz
Mariusz

Reputation: 13946

Most probable cause is that Spark is trying to identify schema of newly created dataframe. Try second method of mapping RDD to DF - specify schema, and go through createDataFrame, for example:

>>> from pyspark.sql.types import *
>>> schema = StructType([StructField('a', StringType()),StructField('b', StringType())])
>>> df = sqlContext.createDataFrame(input_iso.map(extract_iso), schema)

Upvotes: 4

Related Questions