Rishabh Dixit
Rishabh Dixit

Reputation: 115

How to Include the Value of Partitioned Column in a Spark data frame or Spark SQL Temp Table in AWS Glue?

I am using python 3, Glue 1.0 for this code.

I have partitioned data in S3. The data is partitioned in year,month,day,extra_field_name columns.

When I load the data into data frame, I am getting all the columns in it's schema other than the partitioned ones.

Here is the code and output

glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": path_list, "recurse" : True, 'groupFiles': 'inPartition'}, format = "parquet").toDF().registerTempTable(final_arguement_list["read_table_" + str(i+1)])

The path_list variable contains a string of list of paths that need to be loaded into a data frame. I am printing schema using the below command

glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": path_list, "recurse" : True}, format = "parquet").toDF().printSchema()

The schema that I am getting in cloudwatch logs does not contain any of the partitioned columns. Please note that I have already tried loading data by giving path by only providing path till year, month, day, extra_field_name separately but still getting only those columns which are present in the parquet files itself.

Upvotes: 2

Views: 3349

Answers (3)

vivekveeramani
vivekveeramani

Reputation: 145

I was able to do this with an additional step of having a crawler crawl the directory on S3, and then use the table from Glue Catalog as the source for Glue ETL.

Once you have a crawler over the location s3://path/to/source/data/, automatically year,month and day will be treated as partition columns. And then you could try the following in your Glue ETL script.

data_dyf = glueContext.create_dynamic_frame.from_catalog(
  database = db_name,   
  table_name = tbl_name, 
  push_down_predicate="(year=='2018' and month=='05')"
)

You can find more details here

Upvotes: 1

Oliver W.
Oliver W.

Reputation: 13459

Try passing the basePath to the connection_options argument:

glueContext.create_dynamic_frame_from_options(
    connection_type = "s3", 
    connection_options = {
        "paths": path_list,
        "recurse" : True,
        "basePath": "s3://path/to/source/data/"
    },
    format = "parquet").toDF().printSchema()

This way, partition discovery will discover the partitions that are above your paths. According to the documentation, these options will be passed to the Spark SQL DataSource.

Edit: given that your experiment shows it doesn’t work, have you considered passing the top level directory and filtering from there for the dates of interest? The Reader will only read the relevant Hive partitions, as the filter gets ”pushed down” to the file system.

(glueContext.create_dynamic_frame_from_options(
      connection_type = "s3", 
      connection_options = {
          "paths": ["s3://path/to/source/data/"],
          "recurse" : True,
      },
      format = "parquet")
    .toDF()
    .filter(
        (col("year") == 2018)
        && (col("month") == 1)
        && (col("day").between(4, 6)
    ).printSchema()

Upvotes: 0

Rishabh Dixit
Rishabh Dixit

Reputation: 115

As a workaround, I have created a duplicate column in the data frame itself named - year_2, month_2, day_2 and extra_field_name_2 as a copy of year, month, day and extra_field_name.

During data ingestion phase, I have partitioned the data frame on year, month, day and extra_field_name and stored it in S3 which retains the column value of year_2, month_2, day_2 and extra_field_name_2 in the parquet files itself.

While performing data manipulation, I am loading the data in a dynamic frame by providing the list of paths in the following manner:
['s3://path/to/source/data/year=2018/month=1/day=4/', 's3://path/to/source/data/year=2018/month=1/day=5/', 's3://path/to/source/data/year=2018/month=1/day=6/']

This gives me year_2, month_2, day_2 and extra_field_name_2 in the dynamic frame that I can further use for data manipulation.

Upvotes: 0

Related Questions