Reputation: 2333
I have a list of data, the value is basically a bson document (think json), each json ranges from 5k to 20k in size. It either can be in bson object format or can be converted to json directly:
Key, Value
--------
K1, JSON1
K1, JSON2
K2, JSON3
K2, JSON4
I expect the groupByKey would produce:
K1, (JSON1, JSON2)
K2, (JSON3, JSON4)
so that when I do:
val data = [...].map(x => (x.Key, x.Value))
val groupedData = data.groupByKey()
groupedData.foreachRDD { rdd =>
//the elements in the rdd here are not really grouped by the Key
}
I am so confused the the behaviour of the RDD. I read many articles in the internet including the official website from Spark: https://spark.apache.org/docs/0.9.1/scala-programming-guide.html
Still couldn't achieve what I want.
-------- UPDATED ---------------------
Basically I really need it to be grouped by the key, the key is the index to be used in Elasticsearch, so that I can perform batch process based on the key via Elasticsearch for Hadoop:
EsSpark.saveToEs(rdd);
I can't do per partition because Elasticsearch only accept RDD. I tried to use sc.MakeRDD or sc.parallize, both telling me it is not serializable.
I tried to use:
EsSpark.saveToEs(rdd, Map(
"es.resource.write" -> "{TheKeyFromTheObjectAbove}",
"es.batch.size.bytes" -> "5000000")
Documentation of the config is here: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
But it is VERY slow comparing to not using the configuration to define dynamic index based on the value of individual document, I suspect it is parsing every json to fetch the value dynamically.
Upvotes: 4
Views: 1324
Reputation: 443
Here is the example.
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object Test extends App {
val session: SparkSession = SparkSession
.builder.appName("Example")
.config(new SparkConf().setMaster("local[*]"))
.getOrCreate()
val sc = session.sparkContext
import session.implicits._
case class Message(key: String, value: String)
val input: Seq[Message] =
Seq(Message("K1", "foo1"),
Message("K1", "foo2"),
Message("K2", "foo3"),
Message("K2", "foo4"))
val inputRdd: RDD[Message] = sc.parallelize(input)
val intermediate: RDD[(String, String)] =
inputRdd.map(x => (x.key, x.value))
intermediate.toDF().show()
// +---+----+
// | _1| _2|
// +---+----+
// | K1|foo1|
// | K1|foo2|
// | K2|foo3|
// | K2|foo4|
// +---+----+
val output: RDD[(String, List[String])] =
intermediate.groupByKey().map(x => (x._1, x._2.toList))
output.toDF().show()
// +---+------------+
// | _1| _2|
// +---+------------+
// | K1|[foo1, foo2]|
// | K2|[foo3, foo4]|
// +---+------------+
output.foreachPartition(rdd => if (rdd.nonEmpty) {
println(rdd.toList)
})
// List((K1,List(foo1, foo2)))
// List((K2,List(foo3, foo4)))
}
Upvotes: 3