Reputation: 83
I want to partition data using ID, and with in each partition I want to
-apply a set of operations
-take distinct
Doing distinct within each partition will avoid shuffling.
val rowRDD = sc.textFile("flatten_test_data")
.filter(_.nonEmpty)
.map { l =>
val arr = l.split("\u0001")
val id = arr(0)
val value = arr(1)
(id,value)
}.partitionBy(new HashPartitioner(4))
.persist()
Now do something like--
rowRDD.foreachPartition {records => applyOpers(records)}
applyOpers(dataset) should do something like-
dataset.withColumn(udf1).withColumn(udf2).distinct
Upvotes: 2
Views: 2252
Reputation: 7386
forEachPartition
gets executed on the executor. Hence, you cannot access SparkContext/SparkSession inside a forEachPartition.
You can use mapPartitions()
as an alternative to map()
& foreach()
. mapPartitions()
is called once for each Partition unlike map()
& foreach()
which is called for each element in the RDD. The main advantage being that, we can do initialization on Per-Partition basis instead of per-element basis.
We get Iterator
as an argument for mapPartitions
, through which we can iterate through all the elements in a Partition.
For example (the example is in Java but this should give you an idea):
JavaRDD<Integer> rdd = sc.parallelize(
Arrays.asList(1, 2, 3, 4, 5));
FlatMapFunction<Iterator<Integer>, AvgCount> setup = new FlatMapFunction<Iterator<Integer>, AvgCount>() {
@Override
public Iterable<AvgCount> call(Iterator<Integer> input) {
AvgCount a = new AvgCount(0, 0);
while (input.hasNext()) {
a.total_ += input.next();
a.num_ += 1;
}
ArrayList<AvgCount> ret = new ArrayList<AvgCount>();
ret.add(a);
return ret;
}
};
Function2<AvgCount, AvgCount, AvgCount> combine = new Function2<AvgCount, AvgCount, AvgCount>() {
@Override
public AvgCount call(AvgCount a, AvgCount b) {
a.total_ += b.total_;
a.num_ += b.num_;
return a;
}
};
AvgCount result = rdd.mapPartitions(setup).reduce(combine);
Upvotes: 1