Vitor F.M.
Vitor F.M.

Reputation: 119

How to read JSON strings from CSV properlly with Pyspark?

I'm working with The Movies Dataset from https://www.kaggle.com/rounakbanik/the-movies-dataset#movies_metadata.csv.

The credits.csv file has three columns, cast, crew, and id. The cast and crew rows are filled with JSON (wrongly formatted, keys and values are surrounded by single quotes) and I want to extract then into separated DataFrames. But simply trying to load the file is not working. I'm trying as follows:

import pyspark
spark=SparkSession.builder.appName('movies').getOrCreate()
df = spark.read.csv(os.path.join(input_path, 'credits.csv'), header=True)
df.printSchema()
df.show()

root
 |-- cast: string (nullable = true)
 |-- crew: string (nullable = true)
 |-- id: string (nullable = true)

 +--------------------+--------------------+--------------------+
 |                cast|                crew|                  id|
 +--------------------+--------------------+--------------------+
 |[{'cast_id': 14, ...|"[{'credit_id': '...| 'profile_path': ...|
 |[{'cast_id': 1, '...|[{'credit_id': '5...|                8844|
 |[{'cast_id': 2, '...|[{'credit_id': '5...|               15602|
 |"[{'cast_id': 1, ...| 'credit_id': '52...|         'gender': 1|
 |[{'cast_id': 1, '...|[{'credit_id': '5...|               11862|
 |"[{'cast_id': 25,...| 'credit_id': '52...|         'gender': 0|
 |[{'cast_id': 1, '...|[{'credit_id': '5...|               11860|
 |[{'cast_id': 2, '...|[{'credit_id': '5...|               45325|
 |[{'cast_id': 1, '...|[{'credit_id': '5...|                9091|
 |[{'cast_id': 1, '...|[{'credit_id': '5...|                 710|
 |"[{'cast_id': 1, ...| 'credit_id': '52...|         'gender': 2|
 |[{'cast_id': 9, '...|"[{'credit_id': '...| 'profile_path': ...|
 |"[{'cast_id': 1, ...| 'credit_id': '56...|         'gender': 0|
 |"[{'cast_id': 1, ...| 'credit_id': '52...|         'gender': 2|
 |"[{'cast_id': 1, ...| 'credit_id': '59...|         'gender': 2|
 |"[{'cast_id': 4, ...| 'credit_id': '52...|         'gender': 2|
 |[{'cast_id': 6, '...|[{'credit_id': '5...|                4584|
 |[{'cast_id': 42, ...|"[{'credit_id': '...| 'profile_path': ...|
 |"[{'cast_id': 1, ...|         'order': 14| 'profile_path': ...|
 |[{'cast_id': 1, '...|[{'credit_id': '5...|               11517|
 +--------------------+--------------------+--------------------+
 only showing top 20 rows

The ID column should only contain numbers. The cast and crew rows should be loaded as strings, as happens when I try to load the data with Pandas.

import pandas as pd
df=pd.read_csv(os.path.join(input_path, 'credits.csv'))
type(df.cast[0])
str

How can I load the data into Spark DataFrame and collect the JSON data from each row into a new DataFrame?

Upvotes: 3

Views: 2301

Answers (2)

liqudibirum
liqudibirum

Reputation: 41

Escape character and multiline=True worked

credits = spark.read.csv('credits.csv', header=True, inferSchema=True,
                                 quote='"', escape='"', multiLine=True)
credits.printSchema()
credits.show()

output:

root
 |-- cast: string (nullable = true)
 |-- crew: string (nullable = true)
 |-- id: integer (nullable = true)

+--------------------+--------------------+-----+
|                cast|                crew|   id|
+--------------------+--------------------+-----+
|[{'cast_id': 14, ...|[{'credit_id': '5...|  862|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 8844|
|[{'cast_id': 2, '...|[{'credit_id': '5...|15602|
|[{'cast_id': 1, '...|[{'credit_id': '5...|31357|
|[{'cast_id': 1, '...|[{'credit_id': '5...|11862|
|[{'cast_id': 25, ...|[{'credit_id': '5...|  949|
|[{'cast_id': 1, '...|[{'credit_id': '5...|11860|
|[{'cast_id': 2, '...|[{'credit_id': '5...|45325|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 9091|
|[{'cast_id': 1, '...|[{'credit_id': '5...|  710|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 9087|
|[{'cast_id': 9, '...|[{'credit_id': '5...|12110|
|[{'cast_id': 1, '...|[{'credit_id': '5...|21032|
|[{'cast_id': 1, '...|[{'credit_id': '5...|10858|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 1408|
|[{'cast_id': 4, '...|[{'credit_id': '5...|  524|
|[{'cast_id': 6, '...|[{'credit_id': '5...| 4584|
|[{'cast_id': 42, ...|[{'credit_id': '5...|    5|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 9273|
|[{'cast_id': 1, '...|[{'credit_id': '5...|11517|
+--------------------+--------------------+-----+
only showing top 20 rows


Upvotes: 4

Aravind Yarram
Aravind Yarram

Reputation: 80176

You can use the PERMISSIVE mode of the csv reader. The following example will work. I've verified this to work with Scala.

spark.read.format('csv').options(header='true', inferSchema='true', mode='PERMISSIVE').load(path)

Reference: https://docs.databricks.com/data/data-sources/read-csv.html

Upvotes: 1

Related Questions