tensai
tensai

Reputation: 1696

How to stream output from a continous view in pipelinedb?

I have setup pipelinedb and it works great! I would like to know if its possible to stream data out of a continuous view after the value in the view has been updated? That is, have some external process act on changes to a view.

I wish to stream metrics generated from the views into a dashboard, and I do not want to use polling the db to achieve this.

Upvotes: 2

Views: 459

Answers (3)

brettlaforge
brettlaforge

Reputation: 3001

As of 0.9.5, continuous triggers have been removed in favour of using output streams and continuous transforms. (First suggested by DidacticTactic). The output of a continuous view is essentially a stream, which means you can create continuous views or transforms based on it.

Simple Example:

  1. First create a stream and continuous view.
CREATE STREAM s (
    x int
);

CREATE CONTINUOUS VIEW hourly_cv AS
    SELECT
        hour(arrival_timestamp) AS ts,
        SUM(x) AS sum
    FROM s GROUP BY ts;
  1. Every continuous view now has a output stream. You can create a transform based on the output of the view using output_of. In the transform you have access to the tuples old and new which represent the old values and new values respectively. (0.9.7 has a third called delta) So you can create a transform that uses the output of 'hourly_cv' like so:
CREATE CONTINUOUS TRANSFORM hourly_ct AS
    SELECT
        (new).sum
    FROM output_of('hourly_cv')
    THEN EXECUTE PROCEDURE update();
  1. In this example I'm calling update which we still need to define. It needs to be a function that returns a trigger.
CREATE OR REPLACE FUNCTION update()
    RETURNS trigger AS
    $$
    BEGIN
        // Do anything you want here.
        RETURN NEW;
    END;
    $$
    LANGUAGE plpgsql;

I found the 0.9.5 release notes blog post helpful to understand output streams and why continuous triggers are no more.

Upvotes: 3

tensai
tensai

Reputation: 1696

I feel like a bit of an idiot trying to figure out what the answer of this could be like using the tools Didactic provided. Maybe I am blind but I have still not found a way. I found the 9.3 version of the DB which included continuous triggers but this has since been removed and I don't wish to switch to an older version of the DB.

This is a bit sad but I suppose it was moved out of the open source version of the project to accommodate the Real Time analytics dashboard project that the same company provides.

Either way. I solved this issue by using a stored procedure. It's probably slightly inefficient compared to what a built-in function would provide but I am hitting the DB a few thousand time a minute and my VM CPU and RAM just yawn at me.

CREATE OR REPLACE FUNCTION all_insert(text,text)
  RETURNS void AS
 $BODY$
DECLARE
    result text;
BEGIN
    INSERT INTO all_in (streamid, generalinput) values($1, $2);
    SELECT array_to_json(array_agg(json_build_object('streamId', streamid, 'total', count)))::text into result from totals;
    PERFORM pg_notify('totals', result);
END
$BODY$
LANGUAGE plpgsql;

So my insert and notify are done by querying this single stored procedure. Then my application simply has to listen for PSQL db notify events and handle them appropriately. In the example above, the application would receive a JSON object with the particular stream id and the total associated with it.

Upvotes: 1

DidacticTactic
DidacticTactic

Reputation: 94

Check out the sections in our technical docs on output streams and continuous transforms for help on how to do this, and feel free to ping us in our Gitter channel if you need help beyond what you find in the docs.

Upvotes: 2

Related Questions