pay
pay

Reputation: 173

S3Guard or s3committer for Google Cloud Storage

I am working with Dataproc and Parquet on Google Cloud Platform, with data on GCS, and writing lots of small to moderately sized files is a major hassle, being a couple times slower than what I would get with less bigger files or HDFS.

The Hadoop community has been working on S3Guard, which uses DynamoDB for S3A. Similarly, s3committer uses S3's multi-part API to provide a simple alternative committer that is much more efficient.

I am looking for similar solutions on GCS. The multi-part API from S3 is one of the few things not offered by GCS's XML API and thus cannot be used as is. Instead, GCS has a "combine" API where you upload files separately and then issue a combine query. This seems like it could be used to adapt the multi-part upload from s3committer but I am not quite sure.

I could not find any information about using S3Guard on GCS with an alternate key value store (and the S3A connector -- not even sure it can be used with the GCS XML API).

0-rename commits seem to be a common issue with Hadoop and Apache Spark. What are usual solutions to that on GCS, besides "writing less, bigger files"?

Upvotes: 2

Views: 1028

Answers (2)

Dennis Huo
Dennis Huo

Reputation: 10677

There are a few different things in play here. For the problem of enforcing list consistency, Dataproc traditionally relied on a per-cluster NFS mount to apply client-enforced list-after-write consistency; more recently, Google Cloud Storage has managed to improve its list-after-write consistency semantics and now list operations are strongly consistency immediately after all writes. Dataproc is phasing out client-enforced consistency, and something like S3Guard on DynamoDB is no longer needed for GCS.

As for multipart upload, in theory it could be possible to use GCS Compose as you mention, but in most cases the parallel multipart uploads for single large files is mostly helpful in a single-stream situation, whereas most Hadoop/Spark workloads will already be parallelizing different tasks per machine such that it's not beneficial to multithread each individual upload stream; aggregate throughput will be about the same with or without parallel multipart uploads.

So that leaves the question of using the multi-part API to perform conditional/atomic commits. The GCS connector for Hadoop does currently use something called "resumable uploads" where it's theoretically possible for a node to be responsible for "committing" an object that has been uploaded by a completely different node; the client libraries just aren't currently structured to make this very straightforward. However, at the same time, the "copy-and-delete" phase of a GCS "rename" is also different from S3 in that it is done as metadata operations instead of a true data "copy". This makes GCS amenable to using vanilla Hadoop FileCommitters instead of needing to commit "directly" into the final location and skipping the "_temporary" machinery. It may not be ideal to have to "copy/delete" metadata of each file instead of a true directory rename, but it also isn't proportional to the underlying data size, only proportional to the number of files.

Of course, all this still doesn't solve the fact that committing lots of small files is inefficient. It does, however, make it likely that the "direct commit" aspect isn't as much of a factor as you might think; more often the bigger issue is something like Hive not parallelizing file commits at completion time, especially when committing to lots of partition directories. Spark is much better at this, and Hive should be improving over time.

There is a recent performance improvement using a native SSL library in Dataproc 1.2 which you can try without having to "write less, bigger files", just by using Dataproc 1.2 out of the box.

Otherwise, real solutions really do involve writing fewer, bigger files, since even if you fix the write side, you'll suffer on the read side if you have too many small files. GCS is heavily optimized for throughput, so anything less than around 64MB or 128MB may be spending more time just on overhead of spinning up a task and opening the stream vs actual computation (should be able to read that much data in maybe 200ms-500ms or so).

In that vein, you'd want to make sure you set things like hive.merge.mapfiles, hive.merge.mapredfiles, or hive.merge.tezfiles if you're using those, or repartition your Spark dataframes before saving to GCS; merging into larger partitions is usually well worth it for keeping your files manageable and profiting from ongoing faster reads.

Edit: One thing I forgot to mention is that I've been loosely using the term repartition, but in this case since we're strictly trying to bunch up the files into larger files, you may do better with coalesce instead; there's more discussion in another StackOverflow question about repartition vs coalese.

Upvotes: 4

stevel
stevel

Reputation: 13430

S3Guard, HADOOP-13345 retrofits consistency to S3 by having DynamoDB store the listings. This makes it possible, for the first time, to reliably use S3A as a direct destination of work. Without that, execution time time may seem the problem, but the real one is the rename-based committer may get an inconsistent listing and not even see what files it has to rename.

The S3Guard Committer work HADOOP-13786 will, when finished (as of Aug 2017, still a work in progress), provides two committers.

Staging committer

  1. workers write to local filesystem
  2. Task committer uploads to S3 but does not complete the operation. Instead it saves commit metainfo to HDFS.
  3. This commit metainfo is committed as normal task/job data in HDFS.
  4. In Job commit, the committer reads the data of pending commits from HDFS and completes them, then does cleanup of any outstanding commits.

Task commit is an upload of all data, time is O(data/bandwidth).

This is based on Ryan's s3committer at Netflix and is the one which is going to be safest to play with at first.

Magic committer

Called because it does "magic" inside the filesystem.

  1. the Filesystem itself recognises paths like s3a://dest/__magic/job044/task001/__base/part-000.orc.snappy
  2. redirects the write to s3a://dest/__magic/job044/task001/__base/part-000.orc.snappy ; doesn't complete the write in the stream close() call.
  3. saves the commit metainfo to s3a, here s3a://dest/__magic/job044/task001/__base/part-000.orc.snappy.pending
  4. Task commit: loads all .pending files from that dir, aggregates, saves elsewhere. Time is O(files); data size unimportant.

  5. Task abort: load all .pending files, abort the commits

  6. Job commit: load all pending files from committed tasks, completes.

Because it is listing files in S3, it will need S3Guard to deliver consistency on AWS S3 (other S3 implementations are consistent out the box, so don't need it).

Both committers share the same codebase, job commit for both will be O(files/threads), as they are all short POST requests which don't take up bandwidth or much time.

In tests, the staging committer is faster than the magic one for small test-scale files, because the magic committer talks more to S3, which is slow...though S3Guard speeds listing/getFileStatus calls up. The more data you write, the longer task commits on the staging committer take, whereas task commit for the magic one is constant for the same number of files. Both are faster than using rename(), due to how it is mimicked by list, copy

GCS and Hadoop/Spark Commit algorithms

(I haven't looked at the GCS code here, so reserve the right to be wrong. Tread Dennis Huo's statements as authoritative)

If GCS does rename() more efficiently than the S3A copy-then-delete, it should be faster, more O(file) than O(data), depending on parallelisation in the code.

I don't know if they can go for a 0-rename committer. The changes in the mapreduce code under FileOutputFormat are designed to support different/pluggable committers for different filesystems, so they have the opportunity to do something here.

For now, make sure you are using the v2 MR commit algorithm, which, while less resilient to failures, does at least push the renames into task commit, rather than job commit.

See also Spark and Object Stores.

Upvotes: 1

Related Questions