Reputation: 119
I'm working with The Movies Dataset from https://www.kaggle.com/rounakbanik/the-movies-dataset#movies_metadata.csv.
The credits.csv file has three columns, cast, crew, and id. The cast and crew rows are filled with JSON (wrongly formatted, keys and values are surrounded by single quotes) and I want to extract then into separated DataFrames. But simply trying to load the file is not working. I'm trying as follows:
import pyspark
spark=SparkSession.builder.appName('movies').getOrCreate()
df = spark.read.csv(os.path.join(input_path, 'credits.csv'), header=True)
df.printSchema()
df.show()
root
|-- cast: string (nullable = true)
|-- crew: string (nullable = true)
|-- id: string (nullable = true)
+--------------------+--------------------+--------------------+
| cast| crew| id|
+--------------------+--------------------+--------------------+
|[{'cast_id': 14, ...|"[{'credit_id': '...| 'profile_path': ...|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 8844|
|[{'cast_id': 2, '...|[{'credit_id': '5...| 15602|
|"[{'cast_id': 1, ...| 'credit_id': '52...| 'gender': 1|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 11862|
|"[{'cast_id': 25,...| 'credit_id': '52...| 'gender': 0|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 11860|
|[{'cast_id': 2, '...|[{'credit_id': '5...| 45325|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 9091|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 710|
|"[{'cast_id': 1, ...| 'credit_id': '52...| 'gender': 2|
|[{'cast_id': 9, '...|"[{'credit_id': '...| 'profile_path': ...|
|"[{'cast_id': 1, ...| 'credit_id': '56...| 'gender': 0|
|"[{'cast_id': 1, ...| 'credit_id': '52...| 'gender': 2|
|"[{'cast_id': 1, ...| 'credit_id': '59...| 'gender': 2|
|"[{'cast_id': 4, ...| 'credit_id': '52...| 'gender': 2|
|[{'cast_id': 6, '...|[{'credit_id': '5...| 4584|
|[{'cast_id': 42, ...|"[{'credit_id': '...| 'profile_path': ...|
|"[{'cast_id': 1, ...| 'order': 14| 'profile_path': ...|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 11517|
+--------------------+--------------------+--------------------+
only showing top 20 rows
The ID column should only contain numbers. The cast and crew rows should be loaded as strings, as happens when I try to load the data with Pandas.
import pandas as pd
df=pd.read_csv(os.path.join(input_path, 'credits.csv'))
type(df.cast[0])
str
How can I load the data into Spark DataFrame and collect the JSON data from each row into a new DataFrame?
Upvotes: 3
Views: 2301
Reputation: 41
Escape character and multiline=True worked
credits = spark.read.csv('credits.csv', header=True, inferSchema=True,
quote='"', escape='"', multiLine=True)
credits.printSchema()
credits.show()
output:
root
|-- cast: string (nullable = true)
|-- crew: string (nullable = true)
|-- id: integer (nullable = true)
+--------------------+--------------------+-----+
| cast| crew| id|
+--------------------+--------------------+-----+
|[{'cast_id': 14, ...|[{'credit_id': '5...| 862|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 8844|
|[{'cast_id': 2, '...|[{'credit_id': '5...|15602|
|[{'cast_id': 1, '...|[{'credit_id': '5...|31357|
|[{'cast_id': 1, '...|[{'credit_id': '5...|11862|
|[{'cast_id': 25, ...|[{'credit_id': '5...| 949|
|[{'cast_id': 1, '...|[{'credit_id': '5...|11860|
|[{'cast_id': 2, '...|[{'credit_id': '5...|45325|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 9091|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 710|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 9087|
|[{'cast_id': 9, '...|[{'credit_id': '5...|12110|
|[{'cast_id': 1, '...|[{'credit_id': '5...|21032|
|[{'cast_id': 1, '...|[{'credit_id': '5...|10858|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 1408|
|[{'cast_id': 4, '...|[{'credit_id': '5...| 524|
|[{'cast_id': 6, '...|[{'credit_id': '5...| 4584|
|[{'cast_id': 42, ...|[{'credit_id': '5...| 5|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 9273|
|[{'cast_id': 1, '...|[{'credit_id': '5...|11517|
+--------------------+--------------------+-----+
only showing top 20 rows
Upvotes: 4
Reputation: 80176
You can use the PERMISSIVE
mode of the csv reader. The following example will work. I've verified this to work with Scala.
spark.read.format('csv').options(header='true', inferSchema='true', mode='PERMISSIVE').load(path)
Reference: https://docs.databricks.com/data/data-sources/read-csv.html
Upvotes: 1