Minura Punchihewa
Minura Punchihewa

Reputation: 2025

References to Streaming Delta Live Tables

It was my understanding that references to streaming delta live tables require the use of the function STREAM(), supplying the table name as an argument.

Given below is a code snippet that I found in one of the demo notebooks that Databricks provide. Here, I see the use of STREAM() in the FROM clause, but it has not been used in the LEFT JOIN, even though that table is also a streaming table. This query still works.

What exactly is the correct syntax here?

CREATE OR REFRESH STREAMING LIVE TABLE sales_orders_cleaned(
  CONSTRAINT valid_order_number EXPECT (order_number IS NOT NULL) ON VIOLATION DROP ROW
)
COMMENT "The cleaned sales orders with valid order_number(s) and partitioned by order_datetime."
AS
  SELECT f.customer_id, f.customer_name, f.number_of_line_items, 
         timestamp(from_unixtime((cast(f.order_datetime as long)))) as order_datetime, 
         date(from_unixtime((cast(f.order_datetime as long)))) as order_date, 
         f.order_number, f.ordered_products, c.state, c.city, c.lon, c.lat, c.units_purchased, c.loyalty_segment
  FROM STREAM(LIVE.sales_orders_raw) f
  LEFT JOIN LIVE.customers c
    ON c.customer_id = f.customer_id
    AND c.customer_name = f.customer_name

Just for reference, given below are the other two tables that act as inputs to the above query,

CREATE OR REFRESH STREAMING LIVE TABLE sales_orders_raw
COMMENT "The raw sales orders, ingested from /databricks-datasets."
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json", map("cloudFiles.inferColumnTypes", "true"))

CREATE OR REFRESH STREAMING LIVE TABLE customers
COMMENT "The customers buying finished products, ingested from /databricks-datasets."
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv");

Upvotes: 1

Views: 1213

Answers (1)

Alex Ott
Alex Ott

Reputation: 87144

There are different types of joins on the Spark streams:

  • stream-static join. (doc) This is exactly your case, when you have STREAM(LIVE.sales_orders_raw) for orders, but the customers stream is considered static (it's read on each microbatch, and represents the state at moment of invocation). This is usually a case for your kind of functionality.

  • stream-stream join. In this case, both streams may need to align against each other, because data may come later, etc. In this case both streams will use STREAM(LIVE....) syntax. But it may not be the best case for you, because both streams need to wait until late data come, etc. - You will need to define a watermark for both streams, etc. Look for Spark documentation regarding that.

Upvotes: 1

Related Questions