Mark
Mark

Reputation: 538

Try parsing data types in PipelineDB and streaming errors to failing text table?

We were using Pipeline DB to receive data into a streaming table, and in two streaming views, in one view, filter out records that would fail typecasting validataion errors, and in the other view, filter in the records that failed typecasting errors. Ideally, we're trying to separate good from bad records and have them materialize into two final tables.

For example, a system was configured to receive data from a 3rd party in the format YYYY/MM/DD HH24:MI:SS, but for some reason values showed up where the day and month are flipped. In PipelineDB, since using the PostGres SQL "to_timestamp(mycolumn,'YYYY/MM/DD HH24:MI:SS')" will throw a hard error if the text in "mycolumn" was something like '2019/15/05 13:10:24'. And any records fed into the stream within that transaction are rolled back. (So, if PG Copy was used, one record to fail the materialing streaming view causes zero records to be inserted all together. This is not an ideal scenario in data automation, where the 3rd party automated system could care less about our problem to process its data.)

From what I can see: - PostGres has no "native SQL" way to doing a "try-parse" - PipelineDB does not support user defined functions (if we wrote a function with two outputs, one to parse the value, the other returning the boolean "is_valid" column). (My assumption is that the function resides on the server, and pipelinedb executes as a foreign server, which is a different thing all together.)

Ideally, a function returned the typecast value and a boolean flag if it was valid, and it can be used in the WHERE clause of the streaming views to fork good records from bad records. But I can't seem to be able to solve this? Any thoughts?

Upvotes: 0

Views: 45

Answers (1)

Mark
Mark

Reputation: 538

After lots of time, I found a solution to this problem. I don't like it, but it will work.

It dawned on me after realizing the entire problem is predicated on the following:

http://docs.pipelinedb.com/continuous-transforms.html "You can think of continuous transforms as being triggers on top of incoming streaming data where the trigger function is executed for each new row output by the continuous transform. Internally the function is executed as an AFTER INSERT FOR EACH ROW trigger so there is no OLD row and the NEW row contains the row output by the continuous transform."

I spent hours trying to figure out: "why aren't my custom functions working that I wrote to "try-parse" data types for incoming data streams? Nothing would show up in the materializing view or output table? and no hard errors were being thrown by PipelineDB? And then after a few hours, I realize that the problem was associated with the fact that PipelineDB couldn't handle user defined functions, but rather that in the continuous transformation, that transformation expressed as SQL is happening "AFTER THE ROW IS INSERTED". So, fundamentally, altering the typecasting of the datafield within the materializing stream was failing before it started.

The solution (which is not very elegant), is to: 1 - move the typecasting logic or any SQL logic that may result in an error into the trigger function 2 - create an "EXCEPTION WHEN others THEN" section inside the trigger function 3 - make sure that RETURN NEW; happens in both cases of a successful and failed transformation. 4 - make the continuous transformation as as merely a passthrough with applying no logic, it's merely to call the trigger. (in which case it really negates the entire point of using PipelineDB to some extent for this initial data staging problem. But, I digress.)

With that, I created a table to capture the errors, and by ensuring that by all 3 steps listed above are implemented, then we ensure that the transaction will be successul.

That is important because if that is not done and "you get and exception in the exception", or you don't handle an exception gracefully, then no records will be loaded.

This supports the strategic: we are just trying to make a data processing "fork in the river" to route records that successfully transform into one table (or streaming table) one way, and records that fail their transformation into an errors table.

Below I show a POC where, we process the records as a stream and materialize them into a physical table. (it could just as well have been another stream). The keys to this are realizing:

The errors table used text columns The trigger function captures errors in the attempted transformation and writes them to the errors table with a basic description of the error back from the system.

I mention that I don't "like" the solution, but this was the best I could find in a few hours to get around the limitation of PipelineDB doing things as a trigger post-insert, so the failing on insert couldn't be caught, and pipelinedb didn't have an intrinsic capability built in to handle: - continuing the process the stream within a transaction on failure - fail gracefully at the row-level and provide an easier mechanism to route failed transformations to an errors table

DROP SCHEMA IF EXISTS pdb CASCADE;
CREATE SCHEMA IF NOT EXISTS pdb;


DROP TABLE IF EXISTS pdb.lis_final;
CREATE TABLE pdb.lis_final(
    edm___row_id bigint,
    edm___created_dtz timestamp with time zone DEFAULT current_timestamp,
    edm___updatedat_dtz timestamp with time zone DEFAULT current_timestamp,
    patient_id text,
    encounter_id text,
    order_id text,
    sample_id text,
    container_id text,
    result_id text,
    orderrequestcode text,
    orderrequestname text,
    testresultcode text,
    testresultname text,
    testresultcost text,
    testordered_dt timestamp,
    samplereceived_dt timestamp,
    testperformed_dt timestamp,
    testresultsreleased_dt timestamp,
    extractedfromsourceat_dt timestamp,
    birthdate_d date
);

DROP TABLE IF EXISTS pdb.lis_errors;
CREATE TABLE pdb.lis_errors(
    edm___row_id bigint,
    edm___errorat_dtz timestamp with time zone default current_timestamp,
    edm___errormsg text,
    patient_id text,
    encounter_id text,
    order_id text,
    sample_id text,
    container_id text,
    result_id text,
    orderrequestcode text,
    orderrequestname text,
    testresultcode text,
    testresultname text,
    testresultcost text,
    testordered_dt text,
    samplereceived_dt text,
    testperformed_dt text,
    testresultsreleased_dt text,
    extractedfromsourceat_dt text,
    birthdate_d text
);


DROP FOREIGN TABLE IF EXISTS pdb.lis_streaming_table CASCADE;
CREATE FOREIGN TABLE pdb.lis_streaming_table (
    edm___row_id serial,
    patient_id text,
    encounter_id text,
    order_id text,
    sample_id text,
    container_id text,
    result_id text,
    orderrequestcode text,
    orderrequestname text,
    testresultcode text,
    testresultname text,
    testresultcost text,
    testordered_dt text,
    samplereceived_dt text,
    testperformed_dt text,
    testresultsreleased_dt text,
    extractedfromsourceat_dt text,
    birthdate_d text
)
SERVER pipelinedb;


CREATE OR REPLACE FUNCTION insert_into_t()
  RETURNS trigger AS
  $$
  BEGIN

    INSERT INTO pdb.lis_final
    SELECT
        NEW.edm___row_id,
        current_timestamp as edm___created_dtz,
        current_timestamp as edm___updatedat_dtz,
        NEW.patient_id,
        NEW.encounter_id,
        NEW.order_id,
        NEW.sample_id,
        NEW.container_id,
        NEW.result_id,
        NEW.orderrequestcode,
        NEW.orderrequestname,
        NEW.testresultcode,
        NEW.testresultname,
        NEW.testresultcost,
        to_timestamp(NEW.testordered_dt,'YYYY/MM/DD HH24:MI:SS') as testordered_dt,
        to_timestamp(NEW.samplereceived_dt,'YYYY/MM/DD HH24:MI:SS') as samplereceived_dt,
        to_timestamp(NEW.testperformed_dt,'YYYY/MM/DD HH24:MI:SS') as testperformed_dt,
        to_timestamp(NEW.testresultsreleased_dt,'YYYY/MM/DD HH24:MI:SS') as testresultsreleased_dt,
        to_timestamp(NEW.extractedfromsourceat_dt,'YYYY/MM/DD HH24:MI:SS') as extractedfromsourceat_dt,
        to_date(NEW.birthdate_d,'YYYY/MM/DD') as birthdate_d;

    -- Return new as nothing happens
    RETURN NEW;

    EXCEPTION WHEN others THEN

        INSERT INTO pdb.lis_errors
        SELECT
            NEW.edm___row_id,
            current_timestamp as edm___errorat_dtz,
            SQLERRM as edm___errormsg,
            NEW.patient_id,
            NEW.encounter_id,
            NEW.order_id,
            NEW.sample_id,
            NEW.container_id,
            NEW.result_id,
            NEW.orderrequestcode,
            NEW.orderrequestname,
            NEW.testresultcode,
            NEW.testresultname,
            NEW.testresultcost,
            NEW.testordered_dt,
            NEW.samplereceived_dt,
            NEW.testperformed_dt,
            NEW.testresultsreleased_dt,
            NEW.extractedfromsourceat_dt,
            NEW.birthdate_d;

        -- Return new back to the streaming view as we don't want that process to error.  We already routed the record above to the errors table as text.
        RETURN NEW;

  END;
  $$
  LANGUAGE plpgsql;


DROP VIEW IF EXISTS pdb.lis_tryparse CASCADE;
CREATE VIEW pdb.lis_tryparse WITH (action=transform, outputfunc=insert_into_t) AS
SELECT
    edm___row_id,
    patient_id,
    encounter_id,
    order_id,
    sample_id,
    container_id,
    result_id,
    orderrequestcode,
    orderrequestname,
    testresultcode,
    testresultname,
    testresultcost,
    testordered_dt,
    samplereceived_dt,
    testperformed_dt,
    testresultsreleased_dt,
    extractedfromsourceat_dt,
    birthdate_d
FROM pdb.lis_streaming_table as st;

Upvotes: 0

Related Questions