Reputation: 900
I want to efficiently look up many IDs. What I have is a dataframe
that looks like this dataframe
df_source
but with a couple of million records distributed to 10 Workers:
+-------+----------------+
| URI| Links_lists|
+-------+----------------+
| URI_1|[URI_8,URI_9,...|
| URI_2|[URI_6,URI_7,...|
| URI_3|[URI_4,URI_1,...|
| URI_4|[URI_1,URI_5,...|
| URI_5|[URI_3,URI_2,...|
+-------+----------------+
My first step would be to make an RDD
out of df_source
:
rdd_source = df_source.rdd
out of rdd_source
I want to create an RDD
that only contains the URIs with IDs. I do this like that:
rdd_index = rdd_source.map(lambda x: x[0]).zipWithUniqueId()
now I also .flatMap()
the rdd_source
in to an RDD
that contains all relations. Until now only contained within the Links_list
column.
rdd_relations = rdd_source.flatMap(lamda x: x)
now I transform both rdd_index
and rdd_relations
back into dataframes
because I want to do joins and I think (I might be wrong on this) joins on dataframes
are faster.
schema_index = StructType([
StructField("URI", StringType(), True),
StructField("ID", IntegerType(), True))
df_index = sqlContext.createDataFrame(rdd_index, schema=schema_index)
and
schema_relation = StructType([
StructField("URI", StringType(), True),
StructField("LINK", StringType(), True))
df_relations = sqlContext.createDataFrame(rdd_relations, schema=schema_relation )
The resulting dataframes
should look like these two :
df_index:
+-------+-------+
| URI| ID|
+-------+-------+
| URI_1| 1|
| URI_2| 2|
| URI_3| 3|
| URI_4| 4|
| URI_5| 5|
+-------+-------+
df_relations:
+-------+-------+
| URI| LINK|
+-------+-------+
| URI_1| URI_5|
| URI_1| URI_8|
| URI_1| URI_9|
| URI_2| URI_3|
| URI_2| URI_4|
+-------+-------+
now to replace the long string URIs in the df_relations
I will do joins on the df_index
, the first join:
df_relations =\
df_relations.join(df_index, df_relations.URI == df_index.URI,'inner')\
.select(col(ID).alias(URI_ID),col('LINK'))
This should yield me a dataframe
looking like this:
df_relations:
+-------+-------+
| URI_ID| LINK|
+-------+-------+
| 1| URI_5|
| 1| URI_8|
| 1| URI_9|
| 2| URI_3|
| 2| URI_4|
+-------+-------+
And the second join:
df_relations =\
df_relations.join(df_index, df_relations.LINK == df_index.URI,'inner')\
.select(col(URI_ID),col('ID').alias(LINK_ID))
this should result in the final dataframe
the one I need. Looking like this
df_relations:
+-------+-------+
| URI_ID|LINK_ID|
+-------+-------+
| 1| 5|
| 1| 8|
| 1| 9|
| 2| 3|
| 2| 4|
+-------+-------+
where all URIs are replaced with IDs from df_index
.
Is this the efficent way to look up the IDs for all URIs on both columns in the relation table, or is there a more effective way doing this?
I'm using Apache Spark 2.1.0 with Python 3.5
Upvotes: 1
Views: 2509
Reputation: 13001
You do not need to use RDD for the operations you described. Using RDD can be very costly. Second you do not need to do two joins, you can do just one:
import pyspark.sql.functions as f
# add a unique id for each URI
withID = df_source.withColumn("URI_ID", f.monotonically_increasing_id())
# create a single line from each element in the array
exploded = withID.select("URI_ID", f.explode("Links_lists").alias("LINK")
linkID = withID.withColumnRenamed("URI_ID", "LINK_ID").drop("Links_lists")
joined= exploded.join(linkID, on=exploded.LINK==linkID.URI).drop("URI").drop("LINK")
Lastly,if linkID (which is basically df_source with a column replaced) is relatively small (i.e. can be fully contained in a single worker) you can broadcast it. add the following before the join:
linkID = f.broadcast(linkID)
Upvotes: 2