Reputation: 527
I have created the task and procedure, and from the task I am validating whether the stream table have any new insertion then, I trigger the procedure from the task, but in this case my stream table is not cleared even, the transaction completed successfully from the procedure.
Could you please help me, how can I able to clear the stream when I execute the procedure from the task.
CREATE OR REPLACE PROCEDURE bizapp.service_site_arclight_vmtperiodic_procedure()
RETURNS STRING
LANGUAGE JAVASCRIPT
VOLATILE
EXECUTE AS CALLER
AS
$$
snowflake.execute({
sqlText: "begin transaction"
});
try {
const streamServiceSiteMappingQuery = `
SELECT (column1, column2, column3)
FROM <stream>
INNER JOIN <table1>
WHERE <condition>`;
const streamServiceSiteData = snowflake.execute({
sqlText: streamServiceSiteMappingQuery
});
calculated_column = <did some calculation with column>
const insertQuery = `
INSERT INTO <table2> (
<column1>,
<column2>,
<column3>,
<calculated_column>) VALUES (:1, :2, :3, :4);`;
snowflake.execute({
sqlText: insertQuery,
binds: [column1, column2, column3, calculated_column]
});
}
// Added dummy ip validation to clear the stream table rows
snowflake.execute({sqlText: "commit"});
} catch (err) {
resultArray.push("ERROR: " + err);
}
$$
The above example is my procedure, here I am doing the insertQuery, will that not help to clear the stream.
This is how I am calling the procedure:
CREATE OR REPLACE TASK <my_task_name>
WAREHOUSE = &task_warehouse
SCHEDULE = '1 minute'
WHEN SYSTEM$STREAM_HAS_DATA('<stream_name>')
AS
CALL <procedure>();
Thanks in advance.
Upvotes: 0
Views: 282
Reputation: 6269
You can't just run a select
on a stream to clear it. You need to run some DML using the stream as a source in order to clear it. This part of the documentation explains it pretty well:
Querying a stream alone does not advance its offset, even within an explicit transaction; the stream contents must be consumed in a DML statement.
Here is an example:
-- Create a source table & stream
create or replace table source_table(id varchar,name string);
create or replace stream source_table_stream on table source_table;
-- Create a target table
create or replace table target_table(id varchar,name string);
-- Insert some data into the table. This data will show up in the stream
insert overwrite into source_table
values (1, 'Mohammed'),
(2, 'Simon'),
(2, 'Mike'),
(2, 'Felipe')
;
-- See what is in the stream. This won't clear the stream.
select name, id from source_table_stream;
-- +--+--------------+---------------+-----------------+----------------------------------------+
-- |ID|NAME |METADATA$ACTION|METADATA$ISUPDATE|METADATA$ROW_ID |
-- +--+--------------+---------------+-----------------+----------------------------------------+
-- |1 |Mohammed Yasin|INSERT |false |c0ef6686b54ba0efb4bd8ab1917b203111f0816a|
-- |1 |Simon Darr |INSERT |false |5643b2645856dd78fc505bfc72abc80bdb108634|
-- +--+--------------+---------------+-----------------+----------------------------------------+
-- The data is still in the stream...
select name, id from source_table_stream;
-- +--+--------------+---------------+-----------------+----------------------------------------+
-- |ID|NAME |METADATA$ACTION|METADATA$ISUPDATE|METADATA$ROW_ID |
-- +--+--------------+---------------+-----------------+----------------------------------------+
-- |1 |Mohammed Yasin|INSERT |false |c0ef6686b54ba0efb4bd8ab1917b203111f0816a|
-- |1 |Simon Darr |INSERT |false |5643b2645856dd78fc505bfc72abc80bdb108634|
-- +--+--------------+---------------+-----------------+----------------------------------------+
-- Run some DML to insert from the stream into the target_Table.
-- This will clear the stream.
insert into target_table(id, name)
select id, name from source_table_stream;
-- Stream is now cleared:
select * from source_table_stream;
-- +--+--------------+---------------+-----------------+----------------------------------------+
-- |ID|NAME |METADATA$ACTION|METADATA$ISUPDATE|METADATA$ROW_ID |
-- +--+--------------+---------------+-----------------+----------------------------------------+
-- +--+--------------+---------------+-----------------+----------------------------------------+
Upvotes: 2