WolfgangM
WolfgangM

Reputation: 253

Force Glue Crawler to create separate tables

I am continuously add parquet data sets to an S3 folder with a structure like this:

s3:::my-bucket/public/data/set1
s3:::my-bucket/public/data/set2
s3:::my-bucket/public/data/set3

At the beginning I only have set1 and my crawler is configured to run on the whole bucket s3:::my-bucket. This leads to the creation of a partitioned tabled named my-bucket with partitions named public, data and set1. What I actually want is to have a table named set1 without any partitions. I see the reasons why this happens, as it is explained under How Does a Crawler Determine When to Create Partitions?. But when a new data set is uploaded (e.g. set2) I don't want it to be another partition (because it is completely different data with a different schema). How can I force the Glue crawler to NOT create partitions? I know I could define the crawler path as s3:::my-bucket/public/data/ but unfortunately I don't know where the new data sets will be created (e.g. could also be s3:::my-bucket/other/folder/set2).

Any ideas how to solve this?

Upvotes: 4

Views: 2904

Answers (2)

WolfgangM
WolfgangM

Reputation: 253

My solution was to manually add the specific paths to the Glue crawler. The big picture is that I am using a Glue job to transform data from one S3 bucket and write it to another one. I now ended up to initially configure the Glue crawler to crawl the whole bucket. But every time the Glue transformation job runs it also updates the Glue crawler: it removes the initial full bucket location (if it still exists) and then adds the new path to the S3 targets.

In Python it looks something like this:

def update_target_paths(crawler):
    """
    Remove initial include path (whole bucket) from paths and
    add folder for current files to include paths.
    """

    def path_is(c, p):
        return c["Path"] == p

    # get S3 targets and remove initial bucket target
    s3_targets = list(
        filter(
            lambda c: not path_is(c, f"s3://{bucket_name}"),
            crawler["Targets"]["S3Targets"],
        )
    )
    # add new target path if not in targets yet
    if not any(filter(lambda c: path_is(c, output_loc), s3_targets)):
        s3_targets.append({"Path": output_loc})
        logging.info("Appending path '%s' to Glue crawler include path.", output_loc)
    crawler["Targets"]["S3Targets"] = s3_targets
    return crawler


def remove_excessive_keys(crawler):
    """Remove keys from Glue crawler dict that are not needed/allowed to update the crawler"""
    for k in ["State", "CrawlElapsedTime", "CreationTime", "LastUpdated", "LastCrawl", "Version"]:
        try:
            del crawler[k]
        except KeyError:
            logging.warning(f"Key '{k}' not in crawler result dictionary.")
    return crawler


if __name__ == "__main__":
    logging.info(f"Transforming from {input_loc} to {output_loc}.")
    if prefix_exists(curated_zone_bucket_name, curated_zone_key):
        logging.info("Target object already exists, appending.")
    else:
        logging.info("Target object doesn't exist, writing to new one.")
    transform() # do data transformation and write to output bucket
    while True:
        try:
            crawler = get_crawler(CRAWLER_NAME)
            crawler = update_target_paths(crawler)
            crawler = remove_excessive_keys(crawler)

            # Update Glue crawler with updated include paths
            glue_client.update_crawler(**crawler)

            glue_client.start_crawler(Name=CRAWLER_NAME)
            logging.info("Started Glue crawler '%s'.", CRAWLER_NAME)
            break
        except (
            glue_client.exceptions.CrawlerRunningException,
            glue_client.exceptions.InvalidInputException,
        ):
            logging.warning("Crawler still running...")
            time.sleep(10)

Variables defined defined globally: input_loc, output_loc, CRAWLER_NAME, bucket_name.

For every new data set a new path is added to the Glue crawler. No partitions will be created.

Upvotes: 1

Robert Kossendey
Robert Kossendey

Reputation: 7028

You can use the TableLevelConfiguration to specify in which folder level the crawler should look for tables.

More information on that here.

Upvotes: 3

Related Questions