user3735871
user3735871

Reputation: 317

Spark - how to get all relevant columns based on ambiguous names

I have are several dataframes as below. The goal is to find all the relevant information using product_name as keys. The problem is, sometimes it is called prod_name or other similar names. Also for example if product_name is linked to ser_no, the information in that data frame belongs to this product as well. Examples as below, let me know if I can explain this better.

Could someone please help to shed some light on this? I am trying to automate this process with out explicitly joining them together, as there are many tables like this, and I do not know all the exact table/col names. Basically I am trying to pull out all the info relevant to a product_name from these hive tables. It feels like connectedComponent In Spark graphX api but not quite? Many thanks for your help.

df1:

prod_name    value 
    A      code1
    B      code2
    C      code3, code 33
    D       code4

df2:

product_name.    jvalue 
  A            indi3
  B            indi4
  C            indi5
  D            indi6

df3:

   product_name.    ser_no 
     A               100
     B               200
     C               300
     D               400

df4:

  colour.         ser_no 
  Amber           100
  Blue            200
  Orange          300
  Green           400

expected output:

product_no    value          jvalue     ser_no   colour 
    A        code1.           indi3.     100.     Amber
    B        code2            indi4.     200      Blue
    C        code3, code 33.  indi5      300      Orange
    D        code4            indi6      400      Green 

Upvotes: 0

Views: 506

Answers (1)

Erp12
Erp12

Reputation: 652

It seems like there are 3 parts to your question.

  1. Discovering all the tables that have the same logical column, even when the exact column names differ.

  2. Discovering all foreign keys across a set of DataFrames.

  3. Joining a collection of DataFrames together based on appropriate join keys, even when the join key is under a different column name for each DataFrame.

Sub-problem 1

First we will need a method to check if a column name matches our logical column. We will use this as a filter. A regex would be a reasonable approach, but in this case I will simply enumerate a set column names that should be treated the same.

val productNameColumns = Seq("prod_name", "product_name", "product")

Since you have mentioned Hive, I will assume we can use the Spark Catalog to discover tables. We can grab all columns, in all tables, that match our logical column using code like the following.

import org.apache.spark.sql.functions.{col, lit}

val allTables = spark.catalog.listTables().select("name").as[String].collect

val tableColumns = allTables
    .map { tableName => 
        // If you have many tables, this will start many spark jobs. This may be too slow to be feasible.
        spark.catalog
            .listColumns(tableName)
            .where(col("name").isin(productNameColumns: _*))
            // or `.where(col("name").rlike(some-regex))` if using a regex
            .select(
                lit(tableName).as("table"),
                col("name").as("column")
            )
            // Make sure that we only take 1 column per table. This may not be needed depending on your data.
            .groupBy("table")
            .agg(
                first(col("column")).as("column")
            )
    }
    .reduce(_ union _)
    .as[(String, String)]
    .collect
    .toMap

The above code assumes you only care about the current database. You could iterate over all databases if you need wanted.

The tableColumns map is the answer to sub-problem 1. The keys are the table names and the values are the columns names that correspond to "product name".

Sub-problem 2

Sub-problem 2 is a classic example of foreign key discovery. This is an entire research field in itself. I suggest you do some reading on this subject before going down this road. It may seem simple, but it is deceptively difficult. This is especially true when we start talking about keys that are made up of multiple columns.

Let's set this feature aside and move on to sub-problem 3.

Sub-problem 3

The easiest solution to sub-problem 3 is to standardize the column names you will join on. This is easy because we already know the column names for each DataFrame after solving sub-problem 1. We can iterate over our map and rename the column we will join on and then reduce a join.

I have assumed a "full_outer" join because that way we won't lose information about any record. Of course, no matter what join type you pick there will potentially be an explosion of rows if the join key isn't a primary key (unique across rows) for all DataFrames. This will likely be an expensive DataFrame to create regardless.

tableColumns
    .map { case (tableName, columnName) =>
        spark.table(tableName).withColumnRenamed(columnName, "__join_column")
    }
    .reduce { case (accDf, nextDf) =>
        accDf.join(nextDf, Seq("__join_column"), "full_outer")
    }

Upvotes: 2

Related Questions