Vadym B.
Vadym B.

Reputation: 681

combineByKey on a Dstream throws an error

I have a dstream with tuples (String, Int) in it

When I try combineByKey, it says me to specify parameter: Partitioner

my_dstream.combineByKey(
      (v) => (v,1),
      (acc:(Int, Int), v) => (acc._1 + v, acc._2 + 1),
      (acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
    )

However, when I use it on an rdd, it works correctly:

 my_dstream.foreachRDD( rdd =>
      rdd.combineByKey(
        (v) => (v,1),
        (acc:(Int, Int), v) => (acc._1 + v, acc._2 + 1),
        (acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
      ))

Where can I get this partitioner ?

Upvotes: 1

Views: 217

Answers (1)

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149518

Where can I get this partitioner ?

You can create it yourself. Spark comes out of the box with two partitioners: HashPartitioner and RangePartitioner. The default is the former. You can instantiate via it's constructor, you'll need to pass the number of desired partitions:

val numOfPartitions = // specify the amount you want
val hashPartitioner = new HashPartitioner(numOfPartitions)

my_dstream.combineByKey(
  (v) => (v,1),
  (acc:(Int, Int), v) => (acc._1 + v, acc._2 + 1),
  (acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2), 
                                        hashPartitioner
) 

Upvotes: 1

Related Questions