Reputation: 11
I am new to Glue and PySpark. I'm trying to create a job that crawls a large volume of small files and consolidates them into larger files. I understand this is a non-issue using pushdown predicates or partition keys. However, the only available partition key in this data is a date column. But, the files are already separated based on that date. Some of the dates (and their subsequent files) are very small (15kb-5MB usually). And the files are organized into directories in S3 of their subsequent tables. Some of these directories are large (40GB+) and are made of thousands of these small files. The file format is parquet. As a requirement, all columns need to be cast to string (don't worry about why. Just know that it is a hard and fast requirement). Some of the S3 top-level keys (table directories) process just fine. But, only the smaller ones. The larger ones error out every time. I'm assuming it's because the executor runs out of memory. Mainly because the large top-level keys do manage to output some consolidated files. But, only 13.8GB worth regardless of the files processed. I'm running the job on a G.1x (16GB of RAM) and can only assume this is because it ran out of memory though I've seen no specific OOM error in the Cloudwatch Logs. G.2x jobs process more files but eventually error out as well. Furthering my thinking this is a RAM issue. I have tried the G1.x job with anywhere from 10 to 150 DPUs. Same result.
The job script is in python and I'm having a heck of a time trying to keep memory down. I've tried reading all files into one pyspark dataframe with the schema I need, repartitioning it to contain roughly 128MB partitions (my desired end file size), reading that dataframe into a DynamicFrame, then writing the result to the Glue Catalog and writing the files out. That fails for what I feel are obvious reasons.
My latest iteration reads the list of file objects from a top-level S3 directory, gets their sizes, and creates a list of lists. The interior lists contain file keys of S3 objects that add up to roughly 1.28GB in size. Then, we read this into a pyspark frame, repartition this frame to have 10 partitions (for 10 ~128MB files), read into dynamic frame, and write those partitions out. I've used this chunking method with traditional functions, which failed. I then created generators that would create both the pyspark frame and dynamic frame. For whatever reason, python won't release the memory even when generators are involved. That led me to even try using the ol' "run the function in a separate process" trick. But, that fails because the Glue workers are not picklable. My goal is to be able to run this one job to iterate over all top-level S3 keys. But, as you can see in the sample code, at this point I'm just trying one of the larger ones.
I'm nearly certain I'm just so green with Glue that I'm not even approaching this the way Glue wants me to. I've been searching SO, YouTube, and random internet guides (including AWS documentation on Glue) but nothing has helped. I am fully aware this code contains a number of techniques that are less than ideal. At this point, I'm just trying to make some headway, so please try to keep your feedback to the question at hand. And that question is: How do I merge a number of small files into a series of larger files of a fixed size in a Glue job if the total size of the data being processed is greater than the amount of memory in a Glue job? Below is the code I am trying now and the error I am getting. Additional details available on request. Thanks so much for any help you can provide!
Code
import sys
import math
import boto3
from concurrent.futures import ProcessPoolExecutor
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
from pyspark.sql.types import StringType, StructType, StructField
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
s3_resource = boto3.resource('s3')
client = boto3.client('s3')
input_bucket = "test-datastore"
output_bucket = "test-unified-datastore"
num_of_partitions = 10
def get_s3_dirs(input_bucket):
paginator = client.get_paginator('list_objects')
result = paginator.paginate(Bucket=input_bucket, Delimiter='/')
dirs = [prefix.get('Prefix').strip('/') for prefix in result.search('CommonPrefixes')]
return dirs
def get_file_meta(input_bucket, input_key):
bucket_obj = s3_resource.Bucket(input_bucket)
for obj in bucket_obj.objects.filter(Prefix=input_key):
file_meta = {'path': f's3://{input_bucket}/{obj.key}', 'size': obj.size}
yield file_meta
def get_chunk_size():
return math.ceil(num_of_partitions * 128000000)
def create_chunks(input_key):
sized_chunks = []
target_chunk_size = get_chunk_size()
chunks_sized = False
file_meta = get_file_meta(input_bucket, input_key)
while not chunks_sized:
current_chunk = []
current_chunk_size = 0
while current_chunk_size < target_chunk_size:
try:
meta = next(file_meta)
file_path = meta['path']
file_size = meta['size']
current_chunk_size += file_size
current_chunk.append(file_path)
except StopIteration:
if current_chunk:
# if the current chunk is not the only chunk and its smaller than
# 100MB, merge it with the last chunk in sized_chunks
if sized_chunks and current_chunk_size < (1048576 * 100):
current_chunk.extend(sized_chunks.pop(-1))
chunks_sized = True
break
sized_chunks.append(current_chunk)
return sized_chunks
def gen_create_spark_df(schema, chunk, num_of_partitions):
spark_df = spark.read.schema(schema).parquet(*chunk).repartition(num_of_partitions)
yield spark_df
def gen_process_chunks(chunks):
for chunk in chunks:
sample_file = chunk[0]
sample_df = spark.read.parquet(sample_file)
schema = sample_df.schema
struct_fields = []
for field in schema.fields:
schema_field = StructField(
field.name,
StringType(),
field.nullable
)
struct_fields.append(schema_field)
schema = StructType(struct_fields)
output_dynamic = DynamicFrame.fromDF(
next(gen_create_spark_df(schema, chunk, num_of_partitions)),
glueContext, "amazonS3_consolidator_3"
)
yield output_dynamic
def merge_table(input_key):
chunks = create_chunks(input_key)
dfs = gen_process_chunks(chunks)
for df in dfs:
AmazonS3_node1670429555763 = glueContext.getSink(
path=f"s3://{output_bucket}/{input_key}/",
connection_type="s3",
updateBehavior="UPDATE_IN_DATABASE",
partitionKeys=[],
enableUpdateCatalog=True,
transformation_ctx="AmazonS3_node1670429555763",
)
AmazonS3_node1670429555763.setCatalogInfo(
catalogDatabase="test_db", catalogTableName=input_key.lower()
)
AmazonS3_node1670429555763.setFormat("glueparquet")
AmazonS3_node1670429555763.writeFrame(df)
s3_dirs = get_s3_dirs(input_bucket)[7:8]
for input_key in s3_dirs:
merge_table(input_key)
Error
Py4JJavaError: An error occurred while calling o200.pyWriteDynamicFrame.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 24 in stage 26.0 failed 4 times, most recent failure: Lost task 24.3 in stage 26.0 (TID 1928, 172.35.39.79, executor 19): ExecutorLostFailure (executor 19 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
at com.amazonaws.services.glue.sinks.GlueParquetHadoopWriter.doParquetWrite(GlueParquetHadoopWriter.scala:178)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.amazonaws.services.glue.sinks.HadoopDataSink$$anonfun$writeDynamicFrame$1$$anonfun$4.apply$mcV$sp(HadoopDataSink.scala:243)
at com.amazonaws.services.glue.sinks.HadoopDataSink$$anonfun$writeDynamicFrame$1$$anonfun$4.apply(HadoopDataSink.scala:235)
at com.amazonaws.services.glue.sinks.HadoopDataSink$$anonfun$writeDynamicFrame$1$$anonfun$4.apply(HadoopDataSink.scala:235)
at scala.util.Try$.apply(Try.scala:192)
at com.amazonaws.services.glue.sinks.HadoopDataSink$$anonfun$writeDynamicFrame$1.apply(HadoopDataSink.scala:235)
at com.amazonaws.services.glue.sinks.HadoopDataSink$$anonfun$writeDynamicFrame$1.apply(HadoopDataSink.scala:149)
at com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:89)
at com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:89)
at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWith(FileSchemeWrapper.scala:82)
at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWithQualifiedScheme(FileSchemeWrapper.scala:89)
at com.amazonaws.services.glue.sinks.HadoopDataSink.writeDynamicFrame(HadoopDataSink.scala:148)
at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:65)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
I've tried reading the whole top-level key into a dynamic frame, reading the files in the top-level key into chunks and processing each chunk as its own dynamic frame, chunking the files into a dynamic frame created by a generator function, and chunking the files into a dynamic frame that was created in a separate python Process. I've followed a number of AWS Glue guides from AWS and third parties in both blog and YouTube video format.
I've followed this guide from AWS: https://aws.amazon.com/blogs/big-data/best-practices-to-scale-apache-spark-jobs-and-partition-data-with-aws-glue/
And reviewed this SO question: AWS Glue job running out of memory
And found no relevant help.
I expected the glue jobs to consolidate the large array of small files into a smaller subset of larger files.
Regardless of the technique used, I received an error telling me that the RPC client disassociated due to containers exceeding thresholds or network issues.
Upvotes: 0
Views: 1089
Reputation: 71
Create a new Glue Crawler that reads the small Parquet files and creates a corresponding Glue Data Catalog table.
Create a Glue ETL (extract, transform, and load) job that reads the data from the small Parquet files, performs any necessary transformations, and writes the data to larger Parquet files in a different location in S3. Set up a Glue trigger to run the ETL job on a schedule or in response to certain events.
Here is some sample Python code that demonstrates how you can use the Glue API to create and run a Glue ETL job that merges small Parquet files into larger ones:
import boto3
# Create a Glue client
glue_client = boto3.client('glue')
# Set the name of the Glue ETL job and the Glue Data Catalog table that it reads from
job_name = 'my-etl-job'
table_name = 'my-table'
# Set the name of the output S3 bucket and prefix where the large Parquet files will be written
output_bucket = 'my-output-bucket'
output_prefix = 'output/'
# Define the Glue ETL job properties
job_properties = {
'etlJobName': job_name,
'inputPath': f's3://{table_name}',
'outputPath': f's3://{output_bucket}/{output_prefix}',
'maxConcurrentRuns': 1,
'command': {
'name': 'pythonshell',
'pythonVersion': '3',
'scriptLocation': 's3://my-scripts/merge_parquet.py'
}
}
# Create the Glue ETL job
glue_client.create_job(**job_properties)
# Run the Glue ETL job
glue_client.start_job_run(JobName=job_name)
The merge_parquet.py script that is specified in the scriptLocation field is a Python script that reads the small Parquet files, performs any necessary transformations, and writes the data to larger Parquet files. You will need to implement this script yourself, using the pandas library to read and write the Parquet files and the pyarrow library to perform the file merge operation.
Some examples:
import pyarrow.parquet as pq
# Set the input and output filenames
input_filenames = ['file1.parquet', 'file2.parquet', 'file3.parquet']
output_filename = 'merged.parquet'
# Merge the input files
pq.merge_files(input_filenames, output_filename)
or
import pyarrow.parquet as pq
# Set the input and output S3 paths
input_path = 's3://my-bucket/input/'
output_path = 's3://my-bucket/output/merged.parquet'
# Read the input files as a dataset
dataset = pq.ParquetDataset(input_path)
# Merge the input files
pq.merge_files(dataset.paths, output_path)
or you can simple create a spark job in athena notebook, or EMR to have a better memory / cpu control and use the code bellow
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
# Set the input and output paths
input_path = 's3://my-bucket/input/'
output_path = 's3://my-bucket/output/'
# Read the small Parquet files and merge them into a single DataFrame
df = spark.read.option("mergeSchema", "true").parquet(input_path)
# Calculate the number of 128MB partitions needed
num_partitions = df.count() * df.schema.jsonValue()['sizeInBytes'] // (128 * 1024 * 1024)
# Repartition the DataFrame into smaller DataFrames with a target size of 128MB
df_repartitioned = df.repartition(num_partitions)
# Write the repartitioned DataFrames to output files
df_repartitioned.write.mode('overwrite').parquet(output_path)
Upvotes: 3