John
John

Reputation: 81

Pyspark .collect() error - IndexError: list index out of range

I'm getting this error

line 23, in parseRating
    IndexError: list index out of range 

...upon any attempt at .collect(), .count() etc. So final line df3.collect() throws that error, but all the .show()'s work. I don't think it's a problem with the data, but I could be wrong.

New to this, really not sure what's going on. Any help greatly appreciated.

import os
from os import remove, removedirs
from os.path import join, isfile, dirname
from pyspark.sql.functions import col, explode 

import pandas as pd
from pyspark.sql.functions import col, explode
from pyspark import SparkContext

from pyspark.sql import SparkSession


def parseRating(line):
    """
    Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
    """
    fields = line.strip().split("::")
    
    return int(fields[3]), int(fields[0]), int(fields[1]), float(fields[2])
    #return int(fields[0]), int(fields[1]), float(fields[2])

if __name__ == "__main__":

    # set up environment
    spark = SparkSession.builder \
   .master("local") \
   .appName("Movie Recommendation Engine") \
   .config("spark.driver.memory", "16g") \
   .getOrCreate() \
   
    
   
    sc = spark.sparkContext

    # load personal ratings
    #myRatings = loadRatings(os.path.abspath('personalRatings.txt'))
    
    
myRatingsRDD = sc.textFile("personalRatings.txt").map(parseRating)

ratings = sc.textFile("ratings.dat").map(parseRating)
 
    
df1 = spark.createDataFrame(myRatingsRDD,["timestamp","userID","movieID","rating"])
df1.show()

df2 = spark.createDataFrame(ratings,["timestamp","userID","movieID","rating"])
df2.show()

df3 = df1.union(df2)
df3.show()

df3.printSchema()

df3 = df3.\
    withColumn('userID', col('userID').cast('integer')).\
    withColumn('movieID', col('movieID').cast('integer')).\
    withColumn('rating', col('rating').cast('float')).\
    drop('timestamp')
df3.show()
ratings = df3

df3.collect()

Upvotes: 1

Views: 6231

Answers (2)

user2314737
user2314737

Reputation: 29427

The error comes from the function parseRating and it's about a list index out of range. Probably there's some line in the data that does not have the expected number of fields after splitting by the :: separator.

How about importing the text file directly to a dataframe specifying field separators and header true/false and modify datatypes of columns with cast.

Something like this:

df1 = spark.read.format("csv") \
          .option("header", "true") \
          .option("delimiter", "::") \
          .load("personalRatings.txt")

df1 = df1.select(df1.timestamp.cast("int"),df1.userId.cast("int"),df1.movieId.cast("int"),df1.rating.cast("float"))

df1.show(10)

Upvotes: 1

ggordon
ggordon

Reputation: 10035

One of the lines in your textfile may be malformed/incomplete and as a result the split("::") may not generate the number of expected fields. You may update your function to check the number of splits before trying to access the indexes. Eg.

def parseRating(line):
    """
    Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
    """
    fields = line.strip().split("::")
    timestamp = int(fields[3]) if len(fields)>3 else None
    userId = int(fields[0]) if len(fields)>0 else None
    movieId = int(fields[1]) if len(fields)>1 else None
    rating = float(fields[2]) if len(fields)>2 else None

    return timestamp, userId, movieId, rating

you can even do more exception handling if desired.

Let me know if this works for you.

Upvotes: 0

Related Questions