Reputation: 575
I'm running into an issue migrating a SQL Stored Procedure to SparkSQL. I am getting this error in Databricks:
Correlated column is not allowed in predicate
I have the following type of query:
WITH CTE_1
AS (
SELECT id, Date, ......
FROM table
),
CTE_2
AS (
SELECT Date, SUM(blablabla) AS SUM
FROM CTE_1
WHERE (SELECT COUNT(1)
FROM CTE_1 AS prev
WHERE MONTHS_BETWEEN(LEFT(prev.Date,7), LEFT(CTE_1.Date,7)) = 1
AND CTE_1.id = PREV.id
) = 0
AND CTE_1.Date != (SELECT MIN(Date) FROM CTE_1)
GROUP BY Date
)
SELECT *
FROM CTE_2
What would be the best way to set this up in SparkSQL?
Upvotes: 2
Views: 3609
Reputation: 24478
It's because you effectively make a loop: for every id, you compare every Date with every other Date of CTE_1.
You should be able to rewrite it to use window functions.
I tried to do it, but without example data, I'm not sure I understood it correctly. This is what I came up with:
WITH CTE_1
AS (
SELECT id, Date, blablabla, LEAD(Date) OVER (partition by id order by Date) AS Lead
FROM table
),
CTE_2
AS (
SELECT Date, SUM(blablabla) AS SUM
FROM CTE_1
WHERE MONTHS_BETWEEN(LEFT(Lead,7), LEFT(Date,7)) != 1
OR Lead is null
GROUP BY Date
)
SELECT *
FROM CTE_2
Testing in PySpark:
df = spark.createDataFrame(
[("1", '2022-01-07', 5),
("1", '2022-02-07', 5),
("1", '2022-03-07', 5),
("1", '2022-07-07', 5),
("2", '2022-06-07', 5),
("2", '2022-07-07', 5),
("3", '2022-07-07', 5)],
["id", "Date", "blablabla"])
df.createOrReplaceTempView("table")
spark.sql(
"""
WITH CTE_1
AS (
SELECT id, Date, blablabla, LEAD(Date) OVER (partition by id order by Date) AS Lead
FROM table
),
CTE_2
AS (
SELECT Date, SUM(blablabla) AS SUM
FROM CTE_1
WHERE MONTHS_BETWEEN(LEFT(Lead,7), LEFT(Date,7)) != 1
OR Lead is null
GROUP BY Date
)
SELECT *
FROM CTE_2
"""
).show()
# +----------+---+
# | Date|SUM|
# +----------+---+
# |2022-07-07| 15|
# |2022-03-07| 5|
# +----------+---+
Upvotes: 1