Jean Wisser
Jean Wisser

Reputation: 55

Spark RDD recursive operations on simple collection

I have users informations in an RDD :

(Id:10, Name:bla, Adress:50, ...)

And I have another collection containing the successive change of identity we gathered for each user.

(lastId, newId)
    (10, 43)
    (85, 90)
    (43, 50)

I need to get the last identity for each user's id, in this example :

getFinalIdentity(10) = 50     (10 -> 43 -> 50)

For a while I used a broadcast variable containing these identities and iterated over the collection to get the final ID. Everything was working fine until the referential became too big to fit in a broadcast variable ...

I came up with a solution, using an RDD to store the identities and iterating recursively over it, but it is not very fast and looks very complex to me.

Is there an elegant and fast way to make this ?

Upvotes: 1

Views: 532

Answers (1)

Vladislav Varslavans
Vladislav Varslavans

Reputation: 2934

Have you thought about graphs?

You could create a graphs from list of edges as (lastId, newId). This way nodes with no outgoing edges are the final id for the nodes that do not have incoming edges.

It could be done in Spark with GraphX.

Below is an example. It shows for each Id the Id of the first ID in a chain. That means for this change of ids (1 -> 2 -> 3) the result will be (1, 1), (2, 1), (3, 1)

import org.apache.spark.graphx.{EdgeDirection, EdgeTriplet, Graph, VertexId}
import org.apache.spark.{SparkConf, SparkContext}

object Main {

  val conf = new SparkConf().setAppName("myapp").setMaster("local[*]")
  val sc = new SparkContext(conf)

  def main(args: Array[String]): Unit = {

    sc.setLogLevel("ERROR")

    // RDD of pairs (oldId, newId)
    val changedIds = sc.parallelize(Seq((1L, 2L), (2L, 3L), (3L, 4L), (10L, 20L), (20L, 31L), (30L, 40L), (100L, 200L), (200L, 300L)))

    // case classes for pregel operation
    case class Value(originId: VertexId)      // vertex value
    case class Message(value: VertexId)       // message sent from one vertex to another

    // Create graph from id pairs
    val graph = Graph.fromEdgeTuples(changedIds, Value(0))

    // Initial message will be sent to all vertexes at the start
    val initialMsg = Message(0)

    // How vertex should process received message
    def onMsgReceive(vertexId: VertexId, value: Value, msg: Message): Value = {
      // Initial message will have value 0. In that case current vertex need to initialize its value to its own ID
      if (msg.value == 0) Value(vertexId)
      // Otherwise received value is initial ID
      else Value(msg.value)
    }

    // How vertexes should send messages
    def sendMsg(triplet: EdgeTriplet[Value, Int]): Iterator[(VertexId, Message)] = {
      // For the triplet only single message shall be sent to destination vertex
      // Its payload is source vertex origin ID
      Iterator((triplet.dstId, Message(triplet.srcAttr.originId)))
    }

    // How incoming messages to one vertex should be merged
    def mergeMsg(msg1: Message, msg2: Message): Message = {
      // Generally for this case it's an error
      // Because one ID can't have 2 different originIDs
      msg2    // Just return any of the incoming messages
    }

    // Kick out pregel calculation
    val res = graph
      .pregel(initialMsg, Int.MaxValue, EdgeDirection.Out)(onMsgReceive, sendMsg, mergeMsg)

    // Print results
    res.vertices.collect().foreach(println)
  }
}

Output: (finalId firstId)

(100,Value(100))
(4,Value(1))
(300,Value(100))
(200,Value(100))
(40,Value(30))
(20,Value(10))
(1,Value(1))
(30,Value(30))
(10,Value(10))
(2,Value(1))
(3,Value(1))
(31,Value(10))

Upvotes: 1

Related Questions