Reputation: 21
I am trying to automate an ETL pipeline that outputs data from AWS RDS MYSQL to AWS S3. I am currently using AWS Glue to do the job. When I do an initial load from RDS to S3. It captures all the data in the file which is exactly what I want. However, when I add new data to the MYSQL database and run the Glue job again. I get an empty file instead of the added rows. Any help would be MUCH appreciated.
Upvotes: 2
Views: 1362
Reputation: 9715
Bookmarks are definitely at fault here, but it's strange they are not mentioned in the question at all even though they need to be explicitly enabled. If bookmarks are in place, Glue will try its best to figure out what changed in relational source, but without giving it a hand it will be limited.
For JDBC sources, the following rules apply:
1) For each table, AWS Glue uses one or more columns as bookmark keys to determine new and processed data. The bookmark keys combine to form a single compound key.
2) AWS Glue by default uses the primary key as the bookmark key, provided that it is sequentially increasing or decreasing (with no gaps).
3) You can specify the columns to use as bookmark keys in your AWS Glue script. For more information about using Job bookmarks in AWS Glue scripts, see Using job bookmarks.
4) AWS Glue doesn't support using columns with case-sensitive names as job bookmark keys.
You can specify your own query keys as suggested in this post. Note that for direct JDBC connection(not via data catalog), you will have to specify bookmark params in connection options like so:
RelationalDB_node1710411919513 = glueContext.create_dynamic_frame.from_options(
connection_type = "postgresql",
connection_options = {
"useConnectionProperties": "true",
"dbtable": "items",
"connectionName": "Aurora connection",
"jobBookmarkKeys": ["item_id","modified"],
"jobBookmarkKeysSortOrder": "asc"
},
transformation_ctx = "RelationalDB_node1710411919513"
)
Notice that I want to query by a composite key - item_id
and modified
timestamp. If I don't add modified
, Glue would only query for new items.
However, since I've also added modified
, it will be able to fetch updates as long as I do change modified
every time a record in db is updated.
Note that if there are no changes in db and bookmarks are used, Glue will still generate an empty file on every run. Note that if playing with query/bookmark parameters doesn't work, you can always check CloudWatch for logs. As long as CloudWatch logs are enabled, the queries will be logged. Here's my example:
24/03/17 14:35:54 INFO JDBCJobBookmarkUtil$: JDBC Bookmark: querying the last row in db: (select item_id, modified from items order by item_id DESC, modified DESC LIMIT 1) as items
24/03/17 14:47:53 INFO JDBCRDD: Querying jdbc source with sql: SELECT * FROM (select item_id, modified from items order by item_id DESC, modified DESC LIMIT 1) as items
24/03/17 14:47:58 INFO JDBCRDD: Querying jdbc source with sql: SELECT * FROM (select * from items WHERE ((item_id > '152000022') or (item_id = '152000022' and modified > '2023-06-26 16:42:03.13')) and ((item_id < '152000022') or (item_id = '152000022' and modified <= '2023-06-26 16:42:03.13'))) as items
Note that I've ran the job twice without any changes in the database, but in the real-life scenario, with changes happening in db all the time, the semantic of the fetch query would be:
((item_id > 'PREV_BOOKMARK') or (item_id = 'PREV_BOOKMARK' and modified > 'PREV_BOOKMARK'))
and ((item_id < 'NEW_BOOKMARK') or (item_id = 'NEW_BOOKMARK' and modified <= 'NEW_BOOKMARK'))
Upvotes: 0
Reputation: 45
For anybody who is still struggling with this (it drove me mad, because i thought my spark code was wrong), disable bookmarking in job details.
Upvotes: 2
Reputation: 10393
Bookmarking rules for JDBC Sources are here. Important point to remember for JDBC sources is that values have to be increasing or decreasing order and Glue only processes new data from last checkpoint.
Typically, either an autogenerated sequence number or a datatime used as key for bookmarking
Upvotes: 2