alukard990
alukard990

Reputation: 841

Scala-Spark: Convert Dataframe to RDD[Edge]

I have a dataframe that it represents edges of a graph; this is the schema:

root |-- src: string (nullable = true) 
     |-- dst: string (nullable = true) 
     |-- relationship: struct (nullable = false) 
     | |-- business_id: string (nullable = true) 
     | |-- normalized_influence: double (nullable = true)

I want to convert it to RDD[Edge] to work with Pregel API and my difficulties is on attribute "relationship". How can convert it?

Upvotes: 1

Views: 517

Answers (1)

Miguel
Miguel

Reputation: 1211

Edge is a parameterized class. This means you can store whatever you like in each edge, besides the source and destination ids. In your case, it would probably be an Edge[Relationship]. You can use case classes to map both the dataframe and the RDD[Edge[Relationship]]:

import scala.util.hashing.MurmurHash3
case class Relationship(business_id: String, normalized_influence: Double)
case class MyEdge(src: String, dst: String, relationship: Relationship)

val edges: RDD[Edge[Relationship]] = df.as[MyEdge].rdd.map { edge =>
    Edge(
        MurmurHash3.stringHash(edge.src).toLong, // VertexId type is a Long, so we need to hash your string
        MurmurHash3.stringHash(edge.dst).toLong,
        edge.relationship
    )
} 

Upvotes: 2

Related Questions