Reputation: 317
I have are several dataframes as below. The goal is to find all the relevant information using product_name
as keys. The problem is, sometimes it is called prod_name
or other similar names. Also for example if product_name
is linked to ser_no
, the information in that data frame belongs to this product as well. Examples as below, let me know if I can explain this better.
Could someone please help to shed some light on this? I am trying to automate this process with out explicitly joining them together, as there are many tables like this, and I do not know all the exact table/col names. Basically I am trying to pull out all the info relevant to a product_name
from these hive tables. It feels like connectedComponent
In Spark graphX api but not quite? Many thanks for your help.
df1:
prod_name value
A code1
B code2
C code3, code 33
D code4
df2:
product_name. jvalue
A indi3
B indi4
C indi5
D indi6
df3:
product_name. ser_no
A 100
B 200
C 300
D 400
df4:
colour. ser_no
Amber 100
Blue 200
Orange 300
Green 400
expected output:
product_no value jvalue ser_no colour
A code1. indi3. 100. Amber
B code2 indi4. 200 Blue
C code3, code 33. indi5 300 Orange
D code4 indi6 400 Green
Upvotes: 0
Views: 506
Reputation: 652
It seems like there are 3 parts to your question.
Discovering all the tables that have the same logical column, even when the exact column names differ.
Discovering all foreign keys across a set of DataFrames.
Joining a collection of DataFrames together based on appropriate join keys, even when the join key is under a different column name for each DataFrame.
First we will need a method to check if a column name matches our logical column. We will use this as a filter. A regex would be a reasonable approach, but in this case I will simply enumerate a set column names that should be treated the same.
val productNameColumns = Seq("prod_name", "product_name", "product")
Since you have mentioned Hive, I will assume we can use the Spark Catalog to discover tables. We can grab all columns, in all tables, that match our logical column using code like the following.
import org.apache.spark.sql.functions.{col, lit}
val allTables = spark.catalog.listTables().select("name").as[String].collect
val tableColumns = allTables
.map { tableName =>
// If you have many tables, this will start many spark jobs. This may be too slow to be feasible.
spark.catalog
.listColumns(tableName)
.where(col("name").isin(productNameColumns: _*))
// or `.where(col("name").rlike(some-regex))` if using a regex
.select(
lit(tableName).as("table"),
col("name").as("column")
)
// Make sure that we only take 1 column per table. This may not be needed depending on your data.
.groupBy("table")
.agg(
first(col("column")).as("column")
)
}
.reduce(_ union _)
.as[(String, String)]
.collect
.toMap
The above code assumes you only care about the current database. You could iterate over all databases if you need wanted.
The tableColumns
map is the answer to sub-problem 1. The keys are the table names and the values are the columns names that correspond to "product name".
Sub-problem 2 is a classic example of foreign key discovery. This is an entire research field in itself. I suggest you do some reading on this subject before going down this road. It may seem simple, but it is deceptively difficult. This is especially true when we start talking about keys that are made up of multiple columns.
Let's set this feature aside and move on to sub-problem 3.
The easiest solution to sub-problem 3 is to standardize the column names you will join on. This is easy because we already know the column names for each DataFrame after solving sub-problem 1. We can iterate over our map and rename the column we will join on and then reduce a join.
I have assumed a "full_outer"
join because that way we won't lose information about any record. Of course, no matter what join type you pick
there will potentially be an explosion of rows if the join key isn't a primary key (unique across rows) for all DataFrames. This will likely
be an expensive DataFrame to create regardless.
tableColumns
.map { case (tableName, columnName) =>
spark.table(tableName).withColumnRenamed(columnName, "__join_column")
}
.reduce { case (accDf, nextDf) =>
accDf.join(nextDf, Seq("__join_column"), "full_outer")
}
Upvotes: 2