Technical Shil
Technical Shil

Reputation: 331

Pre-shuffle aggregation in Flink

We are migrating a spark job to flink. We have used pre-shuffle aggregation in spark. Is there a way to execute similar operation in spark. We are consuming data from apache kafka. We are using keyed tumbling window to aggregate the data. We want to aggregate the data in flink before performing shuffle.

https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

Upvotes: 4

Views: 488

Answers (1)

Felipe
Felipe

Reputation: 7563

yes, it is possible and I will describe three ways. First the already built-in for Flink Table API. The second way you have to build your own pre-aggregate operator. The third is a dynamic pre-aggregate operator which adjusts the number of events to pre-aggregate before the shuffle phase.

Flink Table API

As it is shown here you can do MiniBatch Aggregation or Local-Global Aggregation. The second option is better. You basically tell to Flink to create mini-batches of every 5000 events and pre-aggregate them before the shuffle phase.

// instantiate table environment
TableEnvironment tEnv = ...

// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");

Flink Stream API

This way is more cumbersome because you have to create your own operator using OneInputStreamOperator and call it using the doTransform(). Here is the example of the BundleOperator.

public abstract class AbstractMapStreamBundleOperator<K, V, IN, OUT>
  extends AbstractUdfStreamOperator<OUT, MapBundleFunction<K, V, IN, OUT>>
  implements OneInputStreamOperator<IN, OUT>, BundleTriggerCallback {
@Override
 public void processElement(StreamRecord<IN> element) throws Exception {
  // get the key and value for the map bundle
  final IN input = element.getValue();
  final K bundleKey = getKey(input);
  final V bundleValue = this.bundle.get(bundleKey);

  // get a new value after adding this element to bundle
  final V newBundleValue = userFunction.addInput(bundleValue, input);

  // update to map bundle
  bundle.put(bundleKey, newBundleValue);

  numOfElements++;
  bundleTrigger.onElement(input);
 }

 @Override
 public void finishBundle() throws Exception {
  if (!bundle.isEmpty()) {
   numOfElements = 0;
   userFunction.finishBundle(bundle, collector);
   bundle.clear();
  }
  bundleTrigger.reset();
 }
}

The call-back interface defines when you are going to trigger the pre-aggregate. Every time that the stream reaches the bundle limit at if (count >= maxCount) your pre-aggregate operator will emit events to the shuffle phase.

public class CountBundleTrigger<T> implements BundleTrigger<T> {
 private final long maxCount;
 private transient BundleTriggerCallback callback;
 private transient long count = 0;

 public CountBundleTrigger(long maxCount) {
  Preconditions.checkArgument(maxCount > 0, "maxCount must be greater than 0");
  this.maxCount = maxCount;
 }

 @Override
 public void registerCallback(BundleTriggerCallback callback) {
  this.callback = Preconditions.checkNotNull(callback, "callback is null");
 }

 @Override
 public void onElement(T element) throws Exception {
  count++;
  if (count >= maxCount) {
   callback.finishBundle();
   reset();
  }
 }

 @Override
 public void reset() {
  count = 0;
 }
}

Then you call your operator using the doTransform:

myStream.map(....)
 .doTransform(metricCombiner, info, new RichMapStreamBundleOperator<>(myMapBundleFunction, bundleTrigger, keyBundleSelector))
 .map(...)
 .keyBy(...)
 .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))

A dynamic pre-aggregation

In case you wish to have a dynamic pre-aggregate operator check the AdCom - Adaptive Combiner for stream aggregation. It basically adjusts the pre-aggregation based on backpressure signals. It results in using the maximum possible of the shuffle phase.

Upvotes: 3

Related Questions