JBernardo
JBernardo

Reputation: 33387

Read spark data with column that clashes with partition name

I have the following file paths that we read with partitions on s3

prefix/company=abcd/service=xyz/date=2021-01-01/file_01.json
prefix/company=abcd/service=xyz/date=2021-01-01/file_02.json
prefix/company=abcd/service=xyz/date=2021-01-01/file_03.json

When I read these with pyspark

self.spark \
    .read \
    .option("basePath", 'prefix') \
    .schema(self.schema) \
    .json(['company=abcd/service=xyz/date=2021-01-01/'])

All the files have the same schema and get loaded in the table as rows. A file could be something like this:

{"id": "foo", "color": "blue", "date": "2021-12-12"}

The issue is that sometimes the files have the date field that clashes with my partition code, like date. So I want to know if it is possible to load the files without the partition columns, rename the JSON date column and then add the partition columns.

Final table would be:

| id  | color | file_date  | company | service | date       |
-------------------------------------------------------------
| foo | blue  | 2021-12-12 | abcd    | xyz     | 2021-01-01 |
| bar | red   | 2021-10-10 | abcd    | xyz     | 2021-01-01 |
| baz | green | 2021-08-08 | abcd    | xyz     | 2021-01-01 |

EDIT:

More information: I have 5 or 6 partitions sometimes and date is one of them (not the last). And I need to read multiple date partitions at once too. The schema that I pass to Spark contains also the partition columns which makes it more complicated.

I don't control the input data so I need to read as is. I can rename the file columns but not the partition columns.

Would it be possible to add an alias to file columns as we would do when joining 2 dataframes?

Spark 3.1

Upvotes: 6

Views: 3780

Answers (4)

Vincent Doba
Vincent Doba

Reputation: 5068

In your specific case, your chance is to read json files.

So instead of loading the files without the partition columns, rename the JSON date column and then add the partition columns, you can load the partitions columns without file columns, rename partition columns and then apply schema on file data.

You would have to do the following steps:

  • Read your json files as text using sparkSession.read.text()
  • Change name of partitioned columns using withColumnRenamed()
  • Convert read json strings to json using from_json method
  • rename columns as you wish using withColumnRenamed()

The complete code would be as follows:

from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DateType

# schema of json files
schema = StructType([
    StructField('id', StringType(), True),
    StructField('color', StringType(), True),
    StructField('date', DateType(), True)
])

df = sparkSession.read.text('resources') \
    .withColumnRenamed('date', 'partition_date') \
    .withColumn('json', F.from_json(F.col('value'), schema)) \
    .select('company', 'service', 'partition_date', 'json.*') \
    .withColumnRenamed('date', 'file_date') \
    .withColumnRenamed('partition_date', 'date')

Example

With the following input files file_01.json and file_02.json under directory prefix/company=abcd/service=xyz/date=2021-01-01

file_01.json

{"id": "foo", "color": "blue", "date": "2021-12-12"}
{"id": "bar", "color": "red", "date": "2021-12-13"}
{"id": "kix", "color": "yellow", "date": "2021-12-14"}

file_02.json

{"id": "kaz", "color": "blue", "date": "2021-12-15"}
{"id": "dir", "color": "red", "date": "2021-12-16"}
{"id": "tux", "color": "yellow", "date": "2021-12-17"}

You get the following output df dataframe:

+-------+-------+----------+---+------+----------+
|company|service|      date| id| color| file_date|
+-------+-------+----------+---+------+----------+
|   abcd|    xyz|2021-01-01|kaz|  blue|2021-12-15|
|   abcd|    xyz|2021-01-01|dir|   red|2021-12-16|
|   abcd|    xyz|2021-01-01|tux|yellow|2021-12-17|
|   abcd|    xyz|2021-01-01|foo|  blue|2021-12-12|
|   abcd|    xyz|2021-01-01|bar|   red|2021-12-13|
|   abcd|    xyz|2021-01-01|kix|yellow|2021-12-14|
+-------+-------+----------+---+------+----------+

Upvotes: 0

blackbishop
blackbishop

Reputation: 32640

One way is to list the files under prefix S3 path using for example Hadoop FS API, then pass that list to spark.read. This way Spark won't detect them as partitions and you'll be able to rename the file columns if needed.

After you load the files into a dataframe, loop through the df columns and rename those which are also present in your partitions_colums list (by adding file prefix for example).

Finally, parse the partitions from the input_file_name() using regexp_extract function.

Here's an example:

from pyspark.sql import functions as F

Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
conf = sc._jsc.hadoopConfiguration()

s3_path = "s3://bucket/prefix"
file_cols = ["id", "color", "date"]
partitions_cols = ["company", "service", "date"]

# listing all files for input path
json_files = []
files = Path(s3_path).getFileSystem(conf).listFiles(Path(s3_path), True)

while files.hasNext():
    path = files.next().getPath()
    if path.getName().endswith(".json"):
        json_files.append(path.toString())

df = spark.read.json(json_files) # you can pass here the schema of the files without the partition columns

# renaming file column in if exists in partitions
df = df.select(*[
    F.col(c).alias(c) if c not in partitions_cols else F.col(c).alias(f"file_{c}")
    for c in df.columns
])

# parse partitions from filenames
for p in partitions_cols:
    df = df.withColumn(p, F.regexp_extract(F.input_file_name(), f"/{p}=([^/]+)/", 1))

df.show()

#+-----+----------+---+-------+-------+----------+
#|color| file_date| id|company|service|      date|
#+-----+----------+---+-------+-------+----------+
#|green|2021-08-08|baz|   abcd|    xyz|2021-01-01|
#| blue|2021-12-12|foo|   abcd|    xyz|2021-01-01|
#|  red|2021-10-10|bar|   abcd|    xyz|2021-01-01|
#+-----+----------+---+-------+-------+----------+

Upvotes: 3

Shane
Shane

Reputation: 626

Yes, we can read all the json files without partition columns. Directly use the parent folder path and it will load all partitions data into the data frame.

After reading the data frame, you can use withColumn() function to rename the date field.

Something like the following should work

df= spark.read.json("s3://bucket/table/**/*.json")

renamedDF= df.withColumnRenamed("old column name","new column name")

Upvotes: 0

ScootCork
ScootCork

Reputation: 3676

Easiest would be to simply change the partition column name. You can then read in the data and rename the columns as you wish. You'll not lose the benefits of partitioning either.

If that is not an option you could read in the jsons using a wildcard for the partitions, rename the date column to 'file_date' and then add the partition date by extracting it from the filename. You can get the filename from input_file_name in pyspark.sql.functions.

Edit: I missed you have other partitioned columns before the date, you'd have to extract them from the filename as well making it less ideal.

Upvotes: 2

Related Questions