mrc
mrc

Reputation: 3173

Parquet data to AWS Redshift slow

I want to insert data from S3 parquet files to Redshift.

Files in parquet comes from a process that reads JSON files, flatten them out, and store as parquet. To do it we use pandas dataframes.

To do so, I tried two different things. The first one:

COPY schema.table
FROM 's3://parquet/provider/A/2020/11/10/11/'
IAM_ROLE 'arn:aws:iam::XXXX'
FORMAT AS PARQUET;

It returned:

Invalid operation: Spectrum Scan Error
error:  Spectrum Scan Error
code:      15001
context:   Unmatched number of columns between table and file. Table columns: 54, Data columns: 41

I understand the error but I don't have an easy option to fix it. If we have to do a reload from 2 months ago the file will only have for example 40 columns, because on that given data we needed just this data but table already increased to 50 columns. So we need something automatically, or that we can specify the columns at least.

Then I applied another option which is to do a SELECT with AWS Redshift Spectrum. We know how many columns the table have using system tables, and we now the structure of the file loading again to a Pandas dataframe. Then I can combine both to have the same identical structure and do the insert.

It works fine but it is slow.

The select looks like:

SELECT fields
FROM schema.table
WHERE partition_0 = 'A'
  AND partition_1 = '2020'
  AND partition_2 = '11'
  AND partition_3 = '10'
  AND partition_4 = '11';

The partitions are already added as I checked using:

select *
from SVV_EXTERNAL_PARTITIONS
where tablename = 'table'
  and schemaname = 'schema'
  and values = '["A","2020","11","10","11"]'
limit 1;

I have around 170 files per hour, both in json and parquet file. The process list all files in S3 json path, and process them and store in S3 parquet path.

I don't know how to improve execution time, as the INSERT from parquet takes 2 minutes per each partition_0 value. I tried the select alone to ensure its not an INSERT issue, and it takes 1:50 minutes. So the issue is to read data from S3.

If I try to select a non existent value for partition_0 it takes again around 2 minutes, so there is some kind of problem to access data. I don't know if partition_0 naming and others are considered as Hive partitioning format.

Edit: AWS Glue Crawler table specification enter image description here

Edit: Add SVL_S3QUERY_SUMMARY results

step:1
starttime: 2020-12-13 07:13:16.267437
endtime: 2020-12-13 07:13:19.644975
elapsed: 3377538
aborted: 0
external_table_name: S3 Scan schema_table
file_format: Parquet         
is_partitioned: t
is_rrscan: f
is_nested: f
s3_scanned_rows: 1132
s3_scanned_bytes: 4131968
s3query_returned_rows: 1132
s3query_returned_bytes: 346923
files: 169
files_max: 34
files_avg: 28
splits: 169
splits_max: 34
splits_avg: 28
total_split_size: 3181587
max_split_size: 30811
avg_split_size: 18825
total_retries:0
max_retries:0
max_request_duration: 360496
avg_request_duration: 172371
max_request_parallelism: 10
avg_request_parallelism: 8.4
total_slowdown_count: 0
max_slowdown_count: 0

Add query checks

Query: 37005074 (SELECT in localhost using pycharm) Query: 37005081 (INSERT in AIRFLOW AWS ECS service)

STL_QUERY Shows that both queries takes around 2 min

select * from STL_QUERY where query=37005081 OR query=37005074 order by query asc;

Query: 37005074 2020-12-14 07:44:57.164336,2020-12-14 07:46:36.094645,0,0,24
Query: 37005081 2020-12-14 07:45:04.551428,2020-12-14 07:46:44.834257,0,0,3

STL_WLM_QUERY Shows that no queue time, all in exec time

select * from STL_WLM_QUERY where query=37005081 OR query=37005074;

Query: 37005074 Queue time 0 Exec time: 98924036 est_peak_mem:0
Query: 37005081 Queue time 0 Exec time: 100279214 est_peak_mem:2097152

SVL_S3QUERY_SUMMARY Shows that query takes 3-4 seconds in s3

select * from SVL_S3QUERY_SUMMARY where query=37005081 OR query=37005074 order by endtime desc;

Query: 37005074 2020-12-14 07:46:33.179352,2020-12-14 07:46:36.091295
Query: 37005081 2020-12-14 07:46:41.869487,2020-12-14 07:46:44.807106

stl_return Comparing min start for to max end for each query. 3-4 seconds as says SVL_S3QUERY_SUMMARY

select * from stl_return where query=37005081 OR query=37005074 order by query asc;

Query:37005074  2020-12-14 07:46:33.175320 2020-12-14 07:46:36.091295
Query:37005081  2020-12-14 07:46:44.817680 2020-12-14 07:46:44.832649

I dont understand why SVL_S3QUERY_SUMMARY shows just 3-4 seconds to run query in spectrum, but then STL_WLM_QUERY says the excution time is around 2 minutes as i see in my localhost and production environtments... Neither how to improve it, because stl_return shows that query returns few data.

EXPLAIN

XN Partition Loop  (cost=0.00..400000022.50 rows=10000000000 width=19608)
  ->  XN Seq Scan PartitionInfo of parquet.table  (cost=0.00..22.50 rows=1 width=0)
        Filter: (((partition_0)::text = 'A'::text) AND ((partition_1)::text = '2020'::text) AND ((partition_2)::text = '12'::text) AND ((partition_3)::text = '10'::text) AND ((partition_4)::text = '12'::text))
  ->  XN S3 Query Scan parquet  (cost=0.00..200000000.00 rows=10000000000 width=19608)
"        ->  S3 Seq Scan parquet.table location:""s3://parquet"" format:PARQUET  (cost=0.00..100000000.00 rows=10000000000 width=19608)"

svl_query_report

select * from svl_query_report where query=37005074 order by segment, step, elapsed_time, rows;

enter image description here

Upvotes: 1

Views: 1395

Answers (1)

Bill Weiner
Bill Weiner

Reputation: 11102

Just like in your other question you need to change your keypaths on your objects. It is not enough to just have "A" in the keypath - it needs to be "partition_0=A". This is how Spectrum knows that the object is or isn't in the partition.

Also you need to make sure that your objects are of reasonable size or it will be slow if you need to scan many of them. It takes time to open each object and if you have many small objects the time to open them can be longer than the time to scan them. This is only an issue if you need to scan many many files.

Upvotes: 1

Related Questions