Aavik
Aavik

Reputation: 1037

Pyspark: Transforming PythonRDD to Dataframe

Could someone guide me to convert PythonRDD to a DataFrame.

As per my understanding, reading a file should create a DF, but in my case it has created a PythonRDD. I finding it hard to convert PythonRDD to a DataFrame. Could not find CreateDataFrame() or toDF().

Please find my below code to read a tab seperated text file:

rdd1 = sparkCxt.textFile(setting.REFRESH_HDFS_DIR + "/Refresh")
rdd2 = rdd1.map(lambda row: unicode(row).lower().strip()\
                    if type(row) == unicode else row)

Now, I would want to convert PythonRDD to a DF. I wanted to convert to DF to map the schema, so that I could do further processing at column level.

Also, please suggest if you think there is a better approach. Please reply if more details are required.

Thank you.

Upvotes: 0

Views: 2771

Answers (3)

Mike Metzger
Mike Metzger

Reputation: 723

I would use the Spark-csv package (Spark-csv Github) and import directly into a dataframe after defining the schema.

For example:

from pyspark.sql import SQLContext
from pyspark.sql.types import *

sqlContext = SQLContext(sc)
customSchema = StructType([ \
    StructField("year", IntegerType(), True), \
    StructField("make", StringType(), True), \
    StructField("model", StringType(), True), \
    StructField("comment", StringType(), True), \
    StructField("blank", StringType(), True)])

df = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .options(header='true') \
    .load('cars.csv', schema = customSchema)

This defaults to a comma for the delimiter, but you can change that to a tab with something like:

df = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .options(header='true', delimiter='\t') \
    .load('cars.csv', schema = customSchema)

Note that it is possible to infer the schema using another option, but this does require reading the entire file prior to loading the dataframe.

Upvotes: 0

Milos Milovanovic
Milos Milovanovic

Reputation: 650

Spark DataFrames can be created directly from a text file, but you should use sqlContext instead of sc (SparkContext), since sqlContext is an entry point for working with DataFrames.

df = sqlContext.read.text('path/to/my/file')

This will create a DataFrame with a single column named value. You can use UDF functions to split it into required columns.

Another approach would be to read the text files to an RDD, split it into columns using map, reduce, filter and other operations, and then convert the final RDD to a DataFrame.

For example, let's say we have a RDD named my_rdd with the following structure:

[(1, 'Alice', 23), (2, 'Bob', 25)]

We can easily convert it to a DataFrame:

df = sqlContext.createDataFrame(my_rdd, ['id', 'name', 'age'])

where id, name and age are names for our columns.

Upvotes: 1

jtitusj
jtitusj

Reputation: 3084

you can try using toPandas() although you should be cautious when doing so since converting an rdd to pandas DataFrame will be like bringing all data into memory which might cause OOM error if your distributed data is large.

Upvotes: 0

Related Questions