Reputation: 53
I am trying to convert the RDD to DataFrame using PySpark. Below is my code.
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
conf = SparkConf().setMaster("local").setAppName("Dataframe_examples")
sc = SparkContext(conf=conf)
def parsedLine(line):
fields = line.split(',')
movieId = fields[0]
movieName = fields[1]
genres = fields[2]
return movieId, movieName, genres
movies = sc.textFile("file:///home/ajit/ml-25m/movies.csv")
parsedLines = movies.map(parsedLine)
print(parsedLines.count())
dataFrame = parsedLines.toDF(["movieId"])
dataFrame.printSchema()
I am running this code using PyCharm IDE.
And I get the error:
File "/home/ajit/PycharmProjects/pythonProject/Dataframe_examples.py", line 19, in <module>
dataFrame = parsedLines.toDF(["movieId"])
AttributeError: 'PipelinedRDD' object has no attribute 'toDF'
As I am new to this, let me know what am I missing?
Upvotes: 1
Views: 2493
Reputation: 13581
Use SparkSession
to make the RDD dataframe as follows:
movies = sc.textFile("file:///home/ajit/ml-25m/movies.csv")
parsedLines = movies.map(parsedLine)
print(parsedLines.count())
spark = SparkSession.builder.getOrCreate()
dataFrame = spark.createDataFrame(parsedLines).toDF(["movieId"])
dataFrame.printSchema()
or use the spark context from the session at first.
spark = SparkSession.builder.master("local").appName("Dataframe_examples").getOrCreate()
sc = spark.sparkContext
Upvotes: 0
Reputation: 31540
Initialize SparkSession
by passing sparkcontext.
Example:
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
conf = SparkConf().setMaster("local").setAppName("Dataframe_examples")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
def parsedLine(line):
fields = line.split(',')
movieId = fields[0]
movieName = fields[1]
genres = fields[2]
return movieId, movieName, genres
movies = sc.textFile("file:///home/ajit/ml-25m/movies.csv")
#or using spark.sparkContext
movies = spark.sparkContext.textFile("file:///home/ajit/ml-25m/movies.csv")
parsedLines = movies.map(parsedLine)
print(parsedLines.count())
dataFrame = parsedLines.toDF(["movieId"])
dataFrame.printSchema()
Upvotes: 2