Reputation: 379
Join two Pcollection with CoGroupsByKey approach taking hours to execute the 8+ millions records. Noted from another stackoverflow post CoGbkResult has more than 10000 elements,reiteration (which may be slow) is required that "CoGbkResult has more than 10000 elements, reiteration (which may be slow) is required."
Any suggestion to improve this performance using this approach.
Here is the code snippet,
PCollection<TableRow> pc1 = ...;
PCollection<TableRow> pc2 = ...;
WithKeys<String, TableRow> withKeyValue =
WithKeys.of((TableRow row) -> String.format("%s",row.get("KEYNAME")))
.withKeyType(TypeDescriptors.strings());
PCollection<KV<String,TableRow>> keyed_pc1 =
pc1.apply("WithKeys", withKeyValue );
PCollection<KV<String,TableRow>> keyed_pc2 =
pc2.apply("WithKeys", withKeyValue );
// (org.apache.beam.sdk.extensions.joinlibrary.Join class)
PCollection<KV<String,KV<TableRow,TableRow>>> joinedCollection =
Join.innerJoin(keyed_pc1, keyed_pc2);
Upvotes: 4
Views: 1278
Reputation: 106
My understanding is that your join has a hot key: a key that has many entries and resulting entry does not fit in memory of a worker. This means that when you consume it later, it might cause to re fetch data that might slow performance.
Join.innerJoin still uses CoGBK internally to do join, so just using that library will not necessarily be more efficient. Order of iterating over collections might be important though.
If you have small pcollection on one side (fits into memory) you can use approach of lookup table for join. See JoinAsLookup for reference.
If you have some means to know which key is hot, you can split it into smaller ones before join, but that requires more work on engineering side and some foreknowledge of data.
Upvotes: 0
Reputation: 918
The Apache Beam specification doesn't define the execution of the join, and there is no faster way of writing inner joins yourself other than the SDK. Thus, the answer to this question depends on what is executing the join, i.e. which runner. I don't know the Flink or Spark runners, so this answer will be specific to the Dataflow runner.
If you haven't already, take a look at this blog post on this topic. In the blog post it describes the Dataflow Shuffle Service that can be manually enabled. This service is a better implementation than the current default and leads to much faster execution in general but especially for joins.
To enable Dataflow Shuffle Service, pass in the following flags:
--experiments=shuffle_mode=service
--region=<allowed region>
Where the allowed regions for shuffle are: "us-central1", "europe-west1", "europe-west4", "asia-northeast1".
Upvotes: 1