Reputation: 665
I am building a model where I am dynamically referencing the table name and schema name based on the results of a query.
{%- set query %}
select * from master_data.event_metadata where game='{{game_name}}'
{% endset -%}
{%- set results = run_query(query) -%}
{%- if execute %}
{%- set record = results.rows[0] -%}
{% else %}
{%- set record = [] -%}
{% endif -%}
Two of the values are in record.SCHEMA_NAME
and record.TABLE_NAME
. I can use something like
select
*
from
{{record.SCHEMA_NAME}}.{{record.TABLE_NAME}}
but I'd rather use the source()
function instead so that my documentation and DAG will be clean. How can I parse record.SCHEMA_NAME
and record.TABLE_NAME
as string arguments. I need to have something like
select
*
from
{{ source(record.SCHEMA_NAME, record.TABLE_NAME) }}
When I try to run the above I get the below error:
Server error: Compilation Error in rpc request (from remote system)
The source name (first) argument to source() must be a string, got <class 'jinja2.runtime.Undefined'>
Upvotes: 3
Views: 11181
Reputation: 11
So basically this can't be done. It goes against the intent of dbt sources and refs which are meant to be static. Best workaround I've found that does not compromise DAG integrity is to create a wrapper model for the dynamic content/table. I use this pattern for dynamic mapping tables that are married to specific sources.
Ex: import snowflake.snowpark as snowpark
def model(dbt, session):
df_business_opertations = session.table(dbt.config.get['job_or_env_var_key_for_dynamic_value'] ) #job variable
return df_business_opertations
So we are using a traditional snowflake session call here under the hood which is set by a job var i.e. dbt run --vars google_this_yourself. So instead of sourcing a dynamic business_operations_for_source_12345.xyz we ref a general/shared business_operations model instead. So now we only have one ref which can be static in downstream models without breaking principles and our DAG remains in tact and navigable when invoking up or downtsream.
You'll have to hard code the dynamic values in the top of your pipeline one time to supply static DAG friendly/expected values. Something like:
table = dbt.config.get("job_or_env_var_key_for_dynamic_value")
if table == "ALASKA_RAW": #my dynamic value
upstream_source = dbt.source("RAW_SCHEMA", "ALASKA_RAW") #MUST USE LITERAL HERE FOR DAG.
be sure to use: dbt.config.get("job_or_env_var_key_for_dynamic_value") consistently for all models to maintain independence and idempotency when running +downstream only. do not update config then pass the value to the next model.
Upvotes: 0
Reputation: 662
You might already have found a workaround or a solution for this, but just in case someone else comes to the same situation...
To convert the values to string you can use the |string
. For instance:
record.SCHEMA_NAME|string
record.TABLE_NAME|string
So your query would look something like this:
select * from {{ source(record.SCHEMA_NAME|string|lower, record.TABLE_NAME|string|lower) }}
Note that depending on the output for your query and how you defined the source file, you might have to lower
or upper
your values to match with your source.
Your record
variable is a result of an execution (run_query(query)
). When you do dbt compile/run
dbt will do a series of operations like read all the files of your project, generate a "manifest.json" file, and will use the ref
/source
to generate the DAG so at this point, no SQL is executed, in other words, execute == False
.
In your example, even if you do record.SCHEMA_NAME|string
you will not be able to retrieve the value of that variable because nothing was executed and since you did if not execute then record = []
, you will get that message ... depends on a source named '.' which was not found
, because at that point, record
is empty.
A workaround would be to wrap your model's query in a if execute
block, something like this:
{% if execute %}
select * from {{ source(record.TABLE_SCHEMA|string|lower, record.TABLE_NAME|string|lower) }}
{% endif %}
With that approach, you will be able to dynamically set the source of your model.
But unfortunately, this will not work as you expected because it will not generate the DAG for that model. Using an if execute
block to wrap your model's query will prevent dbt to generate the DAG for the model.
In the end, this would be the same as your first attempt on having the schema
and table
declared without the source
function.
For more details, you can check the dbt documentation about the execute mode
:
https://docs.getdbt.com/reference/dbt-jinja-functions/execute
Upvotes: 1
Reputation: 3097
I think you need to convert that two objects into their string representation first before passing them to the source
macro.
Try this
select
*
from
{{ source(record.SCHEMA_NAME|string, record.TABLE_NAME||string) }}
Upvotes: 0