Reputation: 1
I am using
What libraries would I need to import specifically in spark context in order to create Delta tables?
I added delta-core_2.12-0.7.0.jar into Glue Dependent Path under "Security configuration, script libraries, and job parameters (optional)" I get error below error--------
File "script_2020-11-08-19-29-39.py", line 54, in fullload_str_metrics_df = spark.read.parquet('s3://rawbucket/.../fullload/.../STR_METRICS/LOAD00000001.parquet') File "/mnt/yarn/usercache/root/appcache/application_1604863378634_0002/container_1604863378634_0002_01_000001/pyspark.zip/pyspark/sql/readwriter.py", line 291, in parquet File "/mnt/yarn/usercache/root/appcache/application_1604863378634_0002/container_1604863378634_0002_01_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call File "/mnt/yarn/usercache/root/appcache/application_1604863378634_0002/container_1604863378634_0002_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/mnt/yarn/usercache/root/appcache/application_1604863378634_0002/container_1604863378634_0002_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o74.parquet.
Upvotes: 0
Views: 418
Reputation: 350
¿Have you tried Glue Bookmarks, and Glue Crawlers?
Bookmarks can monitor an S3 directory and process only new files. It has a commit feature to commit new "offsets" once the process has successfully finished, and a rollback feature to go back to previous states of the offsets (for instance to go back before the execution with ID <job execution ID>.
You can enable bookmark for an AWS Glue Job from its configuration, and once it is enabled, in the code, you can enable bookmark for an specific source passing a transformation_ctx:
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
glue_context = GlueContext(SparkContext.getOrCreate())
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
data_frame = glue_context.create_dynamic_frame.from_catalog(database = '<database>', table_name = '<table_name>', transformation_ctx = '<source_unique_id_for_this_job>').toDF()
some transformations...
sink = DynamicFrame.fromDF(data_frame, glue_context, "<df_name>")
glue_context.write_dynamic_frame.from_options(frame = sink , connection_type = "s3", connection_options = {"path": "<s3_path>"}, format = "parquet")
job.commit()
For this example we are using an AWS Glue Crawler to interpret S3 input directory as a table.
I know this is far from real-time processing, but each batch execution processes only new data.
Upvotes: 0