Reputation: 51
Here is a requirement: the data set is too large, we need to partition the data, calculate a local result in each partition, and then merge. For example, if there are 1 million pieces of data divided into 100 partitions, each copy will have only about 10000 pieces of data. Since the number of partitions needs to be used for tuning, the number of partitions is required to be variable. In addition, all data of a partition must be calculated in batches, and cannot be calculated one by one.
Implementation is as follows:
After the partition phase, each piece of data will have a key to represent the partition to which it belongs. And now, the data is should look like this: afterPartitionedData=[(0,data1),(0,data2)…(1,data3),(1,data4),…,(99,datan)]
。
Next, use Flink's partitionCustom
and mapPartition
operators.
dataSet = env. fromCollection(afterPartitionedData)
dataset
.partitionCustom(new myPartitioner(),0)
.mapPartition(new myMapPartitionFunction[(Int,String),Int]())
…
…
class myPartitioner extends Partitioner[Int]{
override def partition(key: Int, numPartitions: Int) = {
println("numPartitions="+numPartitions) // 6 , CPU number
key // just return the partitionID
}
}
However, there reported an error:
...
Caused by: java.lang.ArrayIndexOutOfBoundsException: 6
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:226)
...
This seems to be because the default number of partitions of Flink DataSet
is the number of CPUs, which is 6 on my computer, so it will be reported java.lang.ArrayIndexOutOfBoundsException : 6
.
So my question is:is there a way to change the number of partitions at will? I found this parameter at method Partition (key: int, numpartitions: int)
in API Partitioner, but did not know how to change it.
Is there a way to change the number of DataSet
partitons?
Flink version is 1.6 and the test code is:
object SimpleFlinkFromBlog {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val afterPartitionedData = new mutable.MutableList[(Int, String)]
afterPartitionedData.+=((0, "0"))
afterPartitionedData.+=((1, "1"))
afterPartitionedData.+=((2, "2"))
afterPartitionedData.+=((2, "2"))
afterPartitionedData.+=((3, "3"))
afterPartitionedData.+=((3, "3"))
afterPartitionedData.+=((3, "3"))
afterPartitionedData.+=((4, "4"))
afterPartitionedData.+=((5, "5"))
afterPartitionedData.+=((5, "5"))
afterPartitionedData.+=((5, "5"))
// Comment this line will not report an error.
// java.lang.ArrayIndexOutOfBoundsException : 6
afterPartitionedData.+=((6, "will wrong"))
val dataSet = env.fromCollection( afterPartitionedData )
val localRes = dataSet
.partitionCustom(new myPartitioner(),0)
.mapPartition(new MapPartitionFunction[(Int,String),Int] {
override def mapPartition(values: lang.Iterable[(Int, String)], out: Collector[Int]) = {
var count = 0;
values.forEach(new Consumer[(Int, String)] {
override def accept(t: (Int, String)): Unit = {
count=count+1;
print("current count is " + count + " tuple is " + t + "\n");
}
})
out.collect(count)
}
})
localRes.collect().foreach(println)
}
class myPartitioner extends Partitioner[Int]{
override def partition(key: Int, numPartitions: Int) = {
// println("numPartitions="+numPartitions)
key
}
}
}
Thank you!
Upvotes: 0
Views: 356
Reputation: 43697
The number of partitions is the parallelism, which you can set on the command line when you submit the job, or in flink-conf.yaml.
Upvotes: 0