marie20
marie20

Reputation: 803

Correlated Subquery in Spark SQL

I have the following 2 tables for which I have to check the existence of values between them using a correlated sub-query.

The requirement is - for each record in the orders table check if the corresponding custid is present in the customer table, and then output a field (named FLAG) with value Y if the custid exists, otherwise N if it doesn't.

orders:

orderid | custid
12345   | XYZ
34566   | XYZ
68790   | MNP
59876   | QRS
15620   | UVW

customer:

id | custid
1  | XYZ
2  | UVW

Expected Output:

orderid | custid  | FLAG
12345   | XYZ     | Y
34566   | XYZ     | Y 
68790   | MNP     | N
59876   | QRS     | N
15620   | UVW     | Y

I tried something like the following but couldn't get it to work -

select 
o.orderid,
o.custid,
case when o.custid EXISTS (select 1 from customer c on c.custid = o.custid)
     then 'Y'
     else 'N'
end as flag
from orders o

Can this be solved with a correlated scalar sub-query ? If not what is the best way to implement this requirement ?

Please advise.

Note: using Spark SQL query v2.4.0

Thanks.

Upvotes: 1

Views: 4465

Answers (1)

Lars Skaug
Lars Skaug

Reputation: 1386

IN/EXISTS predicate sub-queries can only be used in a filter in Spark.

The following works in a locally recreated copy of your data:

select orderid, custid, case when existing_customer is null then 'N' else 'Y' end existing_customer
          from (select o.orderid, o.custid, c.custid existing_customer
                from orders o
                left join customer c
                 on c.custid = o.custid)

Here's how it works with recreated data:

def textToView(csv: String, viewName: String) = {
   spark.read
  .option("ignoreLeadingWhiteSpace", "true")
  .option("ignoreTrailingWhiteSpace", "true")
  .option("delimiter", "|")
  .option("header", "true")
  .csv(spark.sparkContext.parallelize(csv.split("\n")).toDS)
  .createOrReplaceTempView(viewName)
}

textToView("""id | custid
              1  | XYZ
              2  | UVW""", "customer")

textToView("""orderid | custid
              12345   | XYZ
              34566   | XYZ
              68790   | MNP
              59876   | QRS
              15620   | UVW""", "orders")

spark.sql("""
          select orderid, custid, case when existing_customer is null then 'N' else 'Y' end existing_customer
          from (select o.orderid, o.custid, c.custid existing_customer
                from orders o
                left join customer c
                 on c.custid = o.custid)""").show

Which returns:

+-------+------+-----------------+
|orderid|custid|existing_customer|
+-------+------+-----------------+
|  59876|   QRS|                N|
|  12345|   XYZ|                Y|
|  34566|   XYZ|                Y|
|  68790|   MNP|                N|
|  15620|   UVW|                Y|
+-------+------+-----------------+

Upvotes: 3

Related Questions