DataDog
DataDog

Reputation: 525

AWS push down predicate not working when reading HIVE partitions

Trying to test out some glue functionality and the push down predicate is not working on avro files within S3 that were partitioned for use in HIVE. Our partitions are as follows: YYYY-MM-DD.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

filterpred = "loaddate == '2019-08-08'"

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "hive", 
                                                            table_name = "stuff", 
                                                            pushDownPredicate = filterpred)
print ('############################################')
print "COUNT: ", datasource0.count()
print ('##############################################')

df = datasource0.toDF()
df.show(5)

job.commit()

However I still see glue pulling in dates way outside of the range.:

Opening 's3://data/2018-11-29/part-00000-a58ee9cb-c82c-46e6-9657-85b4ead2927d-c000.avro' for reading
2019-09-13 13:47:47,071 INFO [Executor task launch worker for task 258] s3n.S3NativeFileSystem (S3NativeFileSystem.java:open(1208)) -
Opening 's3://data/2017-09-28/part-00000-53c07db9-05d7-4032-aa73-01e239f509cf.avro' for reading

I tried using the examples in the following:

AWS Glue DynamicFrames and Push Down Predicate

AWS Glue DynamicFrames and Push Down Predicate

AWS Glue pushdown predicate not working properly

And currently none of the solutions proposed are working for me. I tried adding the partition column(loaddate), taking it out, quoting, unquoting, etc. Still grabs outside of the date range.

Upvotes: 0

Views: 3159

Answers (3)

vaquar khan
vaquar khan

Reputation: 11479

Make sure your code is partition properly and run in Glue crawlerto create partition table .

Run query in Athena to repair your table .

 MSCK REPAIR TABLE tbl;

Run query in Athena to check partition .

  SHOW PARTITIONS tbl;

Scala you can use following code

Without predicate

     val datasource0 = glueContext.getCatalogSource(database = "ny_taxi_db", tableName = "taxi_tbl", redshiftTmpDir = "", transformationContext = "datasource0").getDynamicFrame()

     datasource0.toDF().count()

With Predicate :

 val predicate = "(year == '2016' and year_month == '201601' and year_month_day == '20160114')"

 val datasource1 = glueContext.getCatalogSource(database = "ny_taxi_db",tableName = "taxi_tbl" , transformationContext = "datasource1",pushDownPredicate = predicate).getDynamicFrame() //

 datasource1.toDF().count()

Python you can use following code :

Without predicate

  ds = glueContext.create_dynamic_frame.from_catalog(database = 
 "ny_taxi_db" , table_name = "taxi_data_by_vender", transformation_ctx = 
"datasource0" )

  ds.toDF().count()

With Predicate :

ds1 = glueContext.create_dynamic_frame.from_catalog(database = "ny_taxi_db" , table_name = "taxi_data_by_vender", transformation_ctx = "datasource1" , push_down_predicate = "(vendorid == 1)")
    
ds1.toDF().count()

Upvotes: 0

Harsh Bafna
Harsh Bafna

Reputation: 2224

There is a syntax error in your code. The correct parameter to pass to from_catalog function is "push_down_predicate" and not "pushDownPredicate".

Sample snippet :

datasource0 = glueContext.create_dynamic_frame.from_catalog(
             database = "hive", 
             table_name = "stuff",
             push_down_predicate = filterpred)

Reference AWS Documentation : https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-partitions.html

Upvotes: 2

Yuriy Bondaruk
Yuriy Bondaruk

Reputation: 4750

Seems like your partition is not in Hive naming style so you have to use a default one partition_0 in a query. Also, as suggested in another answer, the parameter is called push_down_predicate:

filterpred = "partition_0 == '2019-08-08'"

datasource0 = glue_context.create_dynamic_frame.from_catalog(
    database = "hive",
    table_name = "stuff",
    push_down_predicate = filterpred)

Upvotes: 0

Related Questions