PKad
PKad

Reputation: 1

Curation process with Delta Lake libraries (without Databricks)

I am using

What libraries would I need to import specifically in spark context in order to create Delta tables?

I added delta-core_2.12-0.7.0.jar into Glue Dependent Path under "Security configuration, script libraries, and job parameters (optional)" I get error below error--------

File "script_2020-11-08-19-29-39.py", line 54, in fullload_str_metrics_df = spark.read.parquet('s3://rawbucket/.../fullload/.../STR_METRICS/LOAD00000001.parquet') File "/mnt/yarn/usercache/root/appcache/application_1604863378634_0002/container_1604863378634_0002_01_000001/pyspark.zip/pyspark/sql/readwriter.py", line 291, in parquet File "/mnt/yarn/usercache/root/appcache/application_1604863378634_0002/container_1604863378634_0002_01_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call File "/mnt/yarn/usercache/root/appcache/application_1604863378634_0002/container_1604863378634_0002_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/mnt/yarn/usercache/root/appcache/application_1604863378634_0002/container_1604863378634_0002_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o74.parquet.

Upvotes: 0

Views: 418

Answers (1)

¿Have you tried Glue Bookmarks, and Glue Crawlers?

Bookmarks can monitor an S3 directory and process only new files. It has a commit feature to commit new "offsets" once the process has successfully finished, and a rollback feature to go back to previous states of the offsets (for instance to go back before the execution with ID <job execution ID>.

You can enable bookmark for an AWS Glue Job from its configuration, and once it is enabled, in the code, you can enable bookmark for an specific source passing a transformation_ctx:

import sys
    
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
glue_context = GlueContext(SparkContext.getOrCreate())
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

data_frame = glue_context.create_dynamic_frame.from_catalog(database = '<database>', table_name = '<table_name>', transformation_ctx = '<source_unique_id_for_this_job>').toDF()

some transformations...

sink = DynamicFrame.fromDF(data_frame, glue_context, "<df_name>")
    
glue_context.write_dynamic_frame.from_options(frame = sink , connection_type = "s3", connection_options = {"path": "<s3_path>"}, format = "parquet")

job.commit()

For this example we are using an AWS Glue Crawler to interpret S3 input directory as a table.

I know this is far from real-time processing, but each batch execution processes only new data.

Upvotes: 0

Related Questions