Reputation: 55
I have users informations in an RDD :
(Id:10, Name:bla, Adress:50, ...)
And I have another collection containing the successive change of identity we gathered for each user.
(lastId, newId)
(10, 43)
(85, 90)
(43, 50)
I need to get the last identity for each user's id, in this example :
getFinalIdentity(10) = 50 (10 -> 43 -> 50)
For a while I used a broadcast variable containing these identities and iterated over the collection to get the final ID. Everything was working fine until the referential became too big to fit in a broadcast variable ...
I came up with a solution, using an RDD to store the identities and iterating recursively over it, but it is not very fast and looks very complex to me.
Is there an elegant and fast way to make this ?
Upvotes: 1
Views: 532
Reputation: 2934
Have you thought about graphs?
You could create a graphs from list of edges as (lastId, newId)
. This way nodes with no outgoing edges are the final id for the nodes that do not have incoming edges.
It could be done in Spark with GraphX.
Below is an example. It shows for each Id the Id of the first ID in a chain. That means for this change of ids (1 -> 2 -> 3)
the result will be (1, 1), (2, 1), (3, 1)
import org.apache.spark.graphx.{EdgeDirection, EdgeTriplet, Graph, VertexId}
import org.apache.spark.{SparkConf, SparkContext}
object Main {
val conf = new SparkConf().setAppName("myapp").setMaster("local[*]")
val sc = new SparkContext(conf)
def main(args: Array[String]): Unit = {
sc.setLogLevel("ERROR")
// RDD of pairs (oldId, newId)
val changedIds = sc.parallelize(Seq((1L, 2L), (2L, 3L), (3L, 4L), (10L, 20L), (20L, 31L), (30L, 40L), (100L, 200L), (200L, 300L)))
// case classes for pregel operation
case class Value(originId: VertexId) // vertex value
case class Message(value: VertexId) // message sent from one vertex to another
// Create graph from id pairs
val graph = Graph.fromEdgeTuples(changedIds, Value(0))
// Initial message will be sent to all vertexes at the start
val initialMsg = Message(0)
// How vertex should process received message
def onMsgReceive(vertexId: VertexId, value: Value, msg: Message): Value = {
// Initial message will have value 0. In that case current vertex need to initialize its value to its own ID
if (msg.value == 0) Value(vertexId)
// Otherwise received value is initial ID
else Value(msg.value)
}
// How vertexes should send messages
def sendMsg(triplet: EdgeTriplet[Value, Int]): Iterator[(VertexId, Message)] = {
// For the triplet only single message shall be sent to destination vertex
// Its payload is source vertex origin ID
Iterator((triplet.dstId, Message(triplet.srcAttr.originId)))
}
// How incoming messages to one vertex should be merged
def mergeMsg(msg1: Message, msg2: Message): Message = {
// Generally for this case it's an error
// Because one ID can't have 2 different originIDs
msg2 // Just return any of the incoming messages
}
// Kick out pregel calculation
val res = graph
.pregel(initialMsg, Int.MaxValue, EdgeDirection.Out)(onMsgReceive, sendMsg, mergeMsg)
// Print results
res.vertices.collect().foreach(println)
}
}
Output: (finalId firstId)
(100,Value(100))
(4,Value(1))
(300,Value(100))
(200,Value(100))
(40,Value(30))
(20,Value(10))
(1,Value(1))
(30,Value(30))
(10,Value(10))
(2,Value(1))
(3,Value(1))
(31,Value(10))
Upvotes: 1