Jiew Meng
Jiew Meng

Reputation: 88189

AWS Glue pushdown predicate not working properly

I'm trying to optimize my Glue/PySpark job by using push down predicates.

start = date(2019, 2, 13) 
end = date(2019, 2, 27) 
print(">>> Generate data frame for ", start, " to ", end, "... ")
relaventDatesDf = spark.createDataFrame([
    Row(start=start, stop=end)
])
relaventDatesDf.createOrReplaceTempView("relaventDates")

relaventDatesDf = spark.sql("SELECT explode(generate_date_series(start, stop)) AS querydatetime FROM relaventDates")
relaventDatesDf.createOrReplaceTempView("relaventDates")
print("===LOG:Dates===")
relaventDatesDf.show()

flightsGDF = glueContext.create_dynamic_frame.from_catalog(database = "xxx", table_name = "flights", transformation_ctx="flights", push_down_predicate="""
    querydatetime BETWEEN '%s' AND '%s'
    AND querydestinationplace IN (%s)
""" % (start.strftime("%Y-%m-%d"), today.strftime("%Y-%m-%d"), ",".join(map(lambda s: str(s), arr))))

However it appears, that Glue still attempts to read data outside the specified date range?

INFO S3NativeFileSystem: Opening 's3://.../flights/querydestinationplace=12191/querydatetime=2019-03-01/part-00045-6cdebbb1-562c-43fa-915d-93b125aeee61.c000.snappy.parquet' for reading
INFO FileScanRDD: Reading File path: s3://.../flights/querydestinationplace=12191/querydatetime=2019-03-10/part-00021-34a13146-8fb2-43de-9df2-d8925cbe472d.c000.snappy.parquet, range: 0-11797922, partition values: [12191,17965]
WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
INFO S3NativeFileSystem: Opening 's3://.../flights/querydestinationplace=12191/querydatetime=2019-03-10/part-00021-34a13146-8fb2-43de-9df2-d8925cbe472d.c000.snappy.parquet' for reading
WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.

Notice the querydatetime=2019-03-01 and querydatetime=2019-03-10 its outside the specified range of 2019-02-13 - 2019-02-27. Is that why there's the next line "aborting HTTP connection" tho? It goes on to say "This is likely an error and may result in sub-optimal behavior" is something wrong?

I wonder if the problem is because it does not support BETWEEN inside the predicate or IN?


The table create DDL

CREATE EXTERNAL TABLE `flights`(
  `id` string, 
  `querytaskid` string, 
  `queryoriginplace` string, 
  `queryoutbounddate` string, 
  `queryinbounddate` string, 
  `querycabinclass` string, 
  `querycurrency` string, 
  `agent` string, 
  `quoteageinminutes` string, 
  `price` string, 
  `outboundlegid` string, 
  `inboundlegid` string, 
  `outdeparture` string, 
  `outarrival` string, 
  `outduration` string, 
  `outjourneymode` string, 
  `outstops` string, 
  `outcarriers` string, 
  `outoperatingcarriers` string, 
  `numberoutstops` string, 
  `numberoutcarriers` string, 
  `numberoutoperatingcarriers` string, 
  `indeparture` string, 
  `inarrival` string, 
  `induration` string, 
  `injourneymode` string, 
  `instops` string, 
  `incarriers` string, 
  `inoperatingcarriers` string, 
  `numberinstops` string, 
  `numberincarriers` string, 
  `numberinoperatingcarriers` string)
PARTITIONED BY ( 
  `querydestinationplace` string, 
  `querydatetime` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://pinfare-glue/flights/'
TBLPROPERTIES (
  'CrawlerSchemaDeserializerVersion'='1.0', 
  'CrawlerSchemaSerializerVersion'='1.0', 
  'UPDATED_BY_CRAWLER'='pinfare-parquet', 
  'averageRecordSize'='19', 
  'classification'='parquet', 
  'compressionType'='none', 
  'objectCount'='623609', 
  'recordCount'='4368434222', 
  'sizeKey'='86509997099', 
  'typeOfData'='file')

Upvotes: 2

Views: 6333

Answers (4)

sumitya
sumitya

Reputation: 2681

Pushdown predicates in Glue DynamicFrame works fine with between as well as IN clause.

As long as you have correct sequence of partition columns defined in table definition and in query.

I have table with three level of partitions.

s3://bucket/flights/year=2018/month=01/day=01 -> 50 records
s3://bucket/flights/year=2018/month=02/day=02 -> 40 records
s3://bucket/flights/year=2018/month=03/day=03 -> 30 records

Read data in dynamicFrame

ds = glueContext.create_dynamic_frame.from_catalog(
    database = "abc",table_name = "pqr", transformation_ctx = "flights",
    push_down_predicate = "(year == '2018' and month between '02' and '03' and day in ('03'))"
    )
ds.count() 

Output:

30 records

So, you are gonna get the correct results, if sequence of columns is correctly specified. Also note, you need to specify '(quote) IN('%s') in IN clause.

Partition columns in table:

querydestinationplace string, 
querydatetime string

Data read in DynamicFrame:

flightsGDF = glueContext.create_dynamic_frame.from_catalog(database = "xxx", table_name = "flights", transformation_ctx="flights", 
    push_down_predicate=
    """querydestinationplace IN ('%s') AND 
       querydatetime BETWEEN '%s' AND '%s' 
    """ 
    % 
    ( ",".join(map(lambda s: str(s), arr)), 
        start.strftime("%Y-%m-%d"), today.strftime("%Y-%m-%d")))

Upvotes: 1

Developer
Developer

Reputation: 966

Try to do the end as this

start = str(date(2019, 2, 13))
end = str(date(2019, 2, 27)) 
# Set your push_down_predicate variable

pd_predicate = "querydatetime >= '" + start + "' and querydatetime < '" + end + "'"
#pd_predicate = "querydatetime between '" + start + "' AND '" + end + "'" # Or this one?

flightsGDF = glueContext.create_dynamic_frame.from_catalog(
    database = "xxx"
    , table_name = "flights"
    , transformation_ctx="flights"
    , push_down_predicate=pd_predicate)

The pd_predicate will be a string that will work as a push_down_predicate.

Here is a nice read about it if you like.

https://aws.amazon.com/blogs/big-data/work-with-partitioned-data-in-aws-glue/

Upvotes: 0

skadya
skadya

Reputation: 4390

In order to push down your condition, you need to change the order of columns in your partition by clause of table definition

A condition having "in" predicate on first partition column can not be push down as you are expecting.

Let me if it helps.

Upvotes: 3

Harsh Bafna
Harsh Bafna

Reputation: 2224

One of the issue I can see with the code is that you are using "today" instead of "end" in the between clause. Though I don't see the today variable declared anywhere in your code, I am assuming it has been initialized with today's date.

In that case the range will be different and the partitions being read by glue spark is correct.

Upvotes: 3

Related Questions