Thagor
Thagor

Reputation: 900

Efficent Dataframe lookup in Apache Spark

I want to efficiently look up many IDs. What I have is a dataframe that looks like this dataframe df_source but with a couple of million records distributed to 10 Workers:

+-------+----------------+
|    URI|     Links_lists|
+-------+----------------+
|  URI_1|[URI_8,URI_9,...|
|  URI_2|[URI_6,URI_7,...|
|  URI_3|[URI_4,URI_1,...|
|  URI_4|[URI_1,URI_5,...|
|  URI_5|[URI_3,URI_2,...|
+-------+----------------+

My first step would be to make an RDD out of df_source:

rdd_source = df_source.rdd

out of rdd_source I want to create an RDD that only contains the URIs with IDs. I do this like that:

rdd_index = rdd_source.map(lambda x: x[0]).zipWithUniqueId()

now I also .flatMap() the rdd_source in to an RDD that contains all relations. Until now only contained within the Links_list column.

rdd_relations = rdd_source.flatMap(lamda x: x)

now I transform both rdd_index and rdd_relations back into dataframes because I want to do joins and I think (I might be wrong on this) joins on dataframes are faster.

schema_index = StructType([
    StructField("URI", StringType(), True),
    StructField("ID", IntegerType(), True))

df_index = sqlContext.createDataFrame(rdd_index, schema=schema_index)

and

schema_relation = StructType([
    StructField("URI", StringType(), True),
    StructField("LINK", StringType(), True))

df_relations = sqlContext.createDataFrame(rdd_relations, schema=schema_relation )

The resulting dataframes should look like these two :

df_index:
+-------+-------+
|    URI|     ID|
+-------+-------+
|  URI_1|      1|
|  URI_2|      2|
|  URI_3|      3|
|  URI_4|      4|
|  URI_5|      5|
+-------+-------+

df_relations:
+-------+-------+
|    URI|   LINK|
+-------+-------+
|  URI_1|  URI_5|
|  URI_1|  URI_8|
|  URI_1|  URI_9|
|  URI_2|  URI_3|
|  URI_2|  URI_4|
+-------+-------+

now to replace the long string URIs in the df_relations I will do joins on the df_index, the first join:

df_relations =\
df_relations.join(df_index, df_relations.URI == df_index.URI,'inner')\
            .select(col(ID).alias(URI_ID),col('LINK'))

This should yield me a dataframe looking like this:

df_relations:
+-------+-------+
| URI_ID|   LINK|
+-------+-------+
|      1|  URI_5|
|      1|  URI_8|
|      1|  URI_9|
|      2|  URI_3|
|      2|  URI_4|
+-------+-------+

And the second join:

df_relations =\
df_relations.join(df_index, df_relations.LINK == df_index.URI,'inner')\
            .select(col(URI_ID),col('ID').alias(LINK_ID))

this should result in the final dataframe the one I need. Looking like this

df_relations:
+-------+-------+
| URI_ID|LINK_ID|
+-------+-------+
|      1|      5|
|      1|      8|
|      1|      9|
|      2|      3|
|      2|      4|
+-------+-------+

where all URIs are replaced with IDs from df_index.

Is this the efficent way to look up the IDs for all URIs on both columns in the relation table, or is there a more effective way doing this?

I'm using Apache Spark 2.1.0 with Python 3.5

Upvotes: 1

Views: 2509

Answers (1)

Assaf Mendelson
Assaf Mendelson

Reputation: 13001

You do not need to use RDD for the operations you described. Using RDD can be very costly. Second you do not need to do two joins, you can do just one:

import pyspark.sql.functions as f
# add a unique id for each URI
withID = df_source.withColumn("URI_ID", f.monotonically_increasing_id())
# create a single line from each element in the array
exploded = withID.select("URI_ID", f.explode("Links_lists").alias("LINK")
linkID = withID.withColumnRenamed("URI_ID", "LINK_ID").drop("Links_lists")
joined= exploded.join(linkID, on=exploded.LINK==linkID.URI).drop("URI").drop("LINK")

Lastly,if linkID (which is basically df_source with a column replaced) is relatively small (i.e. can be fully contained in a single worker) you can broadcast it. add the following before the join:

linkID = f.broadcast(linkID)

Upvotes: 2

Related Questions