Skip to content

Commit

Permalink
Merge pull request #303 from ScalefreeCOM/177-nh-link-add-union_strat…
Browse files Browse the repository at this point in the history
…egy-parameter

177 nh link add union strategy parameter
  • Loading branch information
tkiehn authored Jan 13, 2025
2 parents 27f64ae + 78d6364 commit 208139c
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 20 deletions.
16 changes: 14 additions & 2 deletions macros/tables/bigquery/nh_link.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{%- macro default__nh_link(link_hashkey, foreign_hashkeys, payload, source_models, src_ldts, src_rsrc, disable_hwm, source_is_single_batch) -%}
{%- macro default__nh_link(link_hashkey, foreign_hashkeys, payload, source_models, src_ldts, src_rsrc, disable_hwm, source_is_single_batch, union_strategy) -%}

{%- set ns = namespace(last_cte= "", source_included_before = {}, has_rsrc_static_defined=true, source_models_rsrc_dict={}) -%}

Expand All @@ -25,6 +25,18 @@
{%- set ns.source_models_rsrc_dict = source_model_values['source_models_rsrc_dict'] -%}
{{ log('source_models: '~source_models, false) }}

{% if union_strategy|lower == 'all' %}
{% set union_command = 'UNION ALL' %}
{% elif union_strategy|lower == 'distinct' %}
{% set union_command = 'UNION DISTINCT' %}
{% else %}
{%- if execute -%}
{%- do exceptions.warn("[" ~ this ~ "] Warning: Parameter 'union_strategy' set to '" ~ union_strategy ~ "' which is not a supported choice. Set to 'all' or 'distinct' instead. UNION ALL is used now.") -%}
{% endif %}
{% set union_command = 'UNION ALL' %}
{% endif %}



{%- if not datavault4dbt.is_something(foreign_hashkeys) -%}
{%- set foreign_hashkeys = [] -%}
Expand Down Expand Up @@ -213,7 +225,7 @@ source_new_union AS (
FROM src_new_{{ source_number }}

{%- if not loop.last %}
UNION ALL
{{ union_command }}
{% endif -%}

{%- endfor -%}
Expand Down
15 changes: 13 additions & 2 deletions macros/tables/databricks/nh_link.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{%- macro databricks__nh_link(link_hashkey, foreign_hashkeys, payload, source_models, src_ldts, src_rsrc, disable_hwm, source_is_single_batch) -%}
{%- macro databricks__nh_link(link_hashkey, foreign_hashkeys, payload, source_models, src_ldts, src_rsrc, disable_hwm, source_is_single_batch, union_strategy) -%}

{%- set ns = namespace(last_cte= "", source_included_before = {}, has_rsrc_static_defined=true, source_models_rsrc_dict={}) -%}

Expand Down Expand Up @@ -26,6 +26,17 @@
{%- set ns.source_models_rsrc_dict = source_model_values['source_models_rsrc_dict'] -%}
{{ log('source_models: '~source_models, false) }}

{% if union_strategy|lower == 'all' %}
{% set union_command = 'UNION ALL' %}
{% elif union_strategy|lower == 'distinct' %}
{% set union_command = 'UNION DISTINCT' %}
{% else %}
{%- if execute -%}
{%- do exceptions.warn("[" ~ this ~ "] Warning: Parameter 'union_strategy' set to '" ~ union_strategy ~ "' which is not a supported choice. Set to 'all' or 'distinct' instead. UNION ALL is used now.") -%}
{% endif %}
{% set union_command = 'UNION ALL' %}
{% endif %}

{%- if not datavault4dbt.is_something(foreign_hashkeys) -%}
{%- set foreign_hashkeys = [] -%}
{%- endif -%}
Expand Down Expand Up @@ -213,7 +224,7 @@ source_new_union AS (
FROM src_new_{{ source_number }}

{%- if not loop.last %}
UNION ALL
{{ union_command }}
{% endif -%}

{%- endfor -%}
Expand Down
15 changes: 13 additions & 2 deletions macros/tables/exasol/nh_link.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{%- macro exasol__nh_link(link_hashkey, foreign_hashkeys, payload, source_models, src_ldts, src_rsrc, disable_hwm, source_is_single_batch) -%}
{%- macro exasol__nh_link(link_hashkey, foreign_hashkeys, payload, source_models, src_ldts, src_rsrc, disable_hwm, source_is_single_batch, union_strategy) -%}

{%- set ns = namespace(last_cte= "", source_included_before = {}, has_rsrc_static_defined=true, source_models_rsrc_dict={}) -%}

Expand Down Expand Up @@ -26,6 +26,17 @@
{{ log('source_models: '~source_models, false) }}


{% if union_strategy|lower == 'all' %}
{% set union_command = 'UNION ALL' %}
{% elif union_strategy|lower == 'distinct' %}
{% set union_command = 'UNION' %}
{% else %}
{%- if execute -%}
{%- do exceptions.warn("[" ~ this ~ "] Warning: Parameter 'union_strategy' set to '" ~ union_strategy ~ "' which is not a supported choice. Set to 'all' or 'distinct' instead. UNION ALL is used now.") -%}
{% endif %}
{% set union_command = 'UNION ALL' %}
{% endif %}

{%- if not datavault4dbt.is_something(foreign_hashkeys) -%}
{%- set foreign_hashkeys = [] -%}
{%- endif -%}
Expand Down Expand Up @@ -213,7 +224,7 @@ source_new_union AS (
FROM src_new_{{ source_number }}

{%- if not loop.last %}
UNION ALL
{{ union_command }}
{% endif -%}

{%- endfor -%}
Expand Down
15 changes: 13 additions & 2 deletions macros/tables/fabric/nh_link.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{%- macro fabric__nh_link(link_hashkey, foreign_hashkeys, payload, source_models, src_ldts, src_rsrc, disable_hwm, source_is_single_batch) -%}
{%- macro fabric__nh_link(link_hashkey, foreign_hashkeys, payload, source_models, src_ldts, src_rsrc, disable_hwm, source_is_single_batch, union_strategy) -%}

{%- set ns = namespace(last_cte= "", source_included_before = {}, has_rsrc_static_defined=true, source_models_rsrc_dict={}) -%}

Expand Down Expand Up @@ -27,6 +27,17 @@
{%- set ns.source_models_rsrc_dict = source_model_values['source_models_rsrc_dict'] -%}
{{ log('source_models: '~source_models, false) }}

{% if union_strategy|lower == 'all' %}
{% set union_command = 'UNION ALL' %}
{% elif union_strategy|lower == 'distinct' %}
{% set union_command = 'UNION' %}
{% else %}
{%- if execute -%}
{%- do exceptions.warn("[" ~ this ~ "] Warning: Parameter 'union_strategy' set to '" ~ union_strategy ~ "' which is not a supported choice. Set to 'all' or 'distinct' instead. UNION ALL is used now.") -%}
{% endif %}
{% set union_command = 'UNION ALL' %}
{% endif %}

{%- if not datavault4dbt.is_something(foreign_hashkeys) -%}
{%- set foreign_hashkeys = [] -%}
{%- endif -%}
Expand Down Expand Up @@ -221,7 +232,7 @@ source_new_union AS (
FROM src_new_{{ source_number }}

{%- if not loop.last %}
UNION ALL
{{ union_command }}
{% endif -%}

{%- endfor -%}
Expand Down
11 changes: 9 additions & 2 deletions macros/tables/nh_link.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
In the background a non-historized link uses exactly the same loading logic as a regular link, but adds the descriptive attributes as additional payload.
#}

{%- macro nh_link(yaml_metadata=none, link_hashkey=none, payload=none, source_models=none, foreign_hashkeys=none, src_ldts=none, src_rsrc=none, disable_hwm=false, source_is_single_batch=false) -%}
{%- macro nh_link(yaml_metadata=none, link_hashkey=none, payload=none, source_models=none, foreign_hashkeys=none, src_ldts=none, src_rsrc=none, disable_hwm=false, source_is_single_batch=false, union_strategy='all') -%}

{% set link_hashkey_description = "
link_hashkey::string Name of the non-historized link hashkey column inside the stage. Should get calculated out of all business keys inside
Expand Down Expand Up @@ -92,6 +92,11 @@
Needs to use the same column name as defined as alias inside the staging model.
" %}

{% set union_strategy_description = "
union_strategy::'all' | 'distinct' Defines how multiple sources should be unioned. 'all' will result in a UNION ALL and represents the default value. Should only be changed, if you have duplicates across
source systems, and don't want to deduplicate them upfront.
" %}

{%- set link_hashkey = datavault4dbt.yaml_metadata_parser(name='link_hashkey', yaml_metadata=yaml_metadata, parameter=link_hashkey, required=True, documentation=link_hashkey_description) -%}
{%- set payload = datavault4dbt.yaml_metadata_parser(name='payload', yaml_metadata=yaml_metadata, parameter=payload, required=True, documentation=payload_description) -%}
{%- set source_models = datavault4dbt.yaml_metadata_parser(name='source_models', yaml_metadata=yaml_metadata, parameter=source_models, required=True, documentation=source_models_description) -%}
Expand All @@ -100,6 +105,7 @@
{%- set rsrc = datavault4dbt.yaml_metadata_parser(name='rsrc', yaml_metadata=yaml_metadata, parameter=rsrc, required=False, documentation=rsrc_description) -%}
{%- set disable_hwm = datavault4dbt.yaml_metadata_parser(name='disable_hwm', yaml_metadata=yaml_metadata, parameter=disable_hwm, required=False, documentation='Whether the High Water Mark should be turned off. Optional, default False.') -%}
{%- set source_is_single_batch = datavault4dbt.yaml_metadata_parser(name='source_is_single_batch', yaml_metadata=yaml_metadata, parameter=source_is_single_batch, required=False, documentation='Whether the source contains only one batch. Optional, default False.') -%}
{%- set union_strategy = datavault4dbt.yaml_metadata_parser(name='union_strategy', yaml_metadata=yaml_metadata, parameter=union_strategy, required=False, documentation=union_strategy_description) -%}

{# Applying the default aliases as stored inside the global variables, if src_ldts and src_rsrc are not set. #}

Expand All @@ -113,6 +119,7 @@
src_rsrc=src_rsrc,
source_models=source_models,
disable_hwm=disable_hwm,
source_is_single_batch=source_is_single_batch) -}}
source_is_single_batch=source_is_single_batch,
union_strategy=union_strategy) -}}

{%- endmacro -%}
14 changes: 12 additions & 2 deletions macros/tables/oracle/nh_link.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{%- macro oracle__nh_link(link_hashkey, foreign_hashkeys, payload, source_models, src_ldts, src_rsrc, disable_hwm, source_is_single_batch) -%}
{%- macro oracle__nh_link(link_hashkey, foreign_hashkeys, payload, source_models, src_ldts, src_rsrc, disable_hwm, source_is_single_batch, union_strategy) -%}

{%- set ns = namespace(last_cte= "", source_included_before = {}, has_rsrc_static_defined=true, source_models_rsrc_dict={}) -%}

Expand Down Expand Up @@ -26,6 +26,16 @@
{%- set ns.source_models_rsrc_dict = source_model_values['source_models_rsrc_dict'] -%}
{{ log('source_models: '~source_models, false) }}

{% if union_strategy|lower == 'all' %}
{% set union_command = 'UNION ALL' %}
{% elif union_strategy|lower == 'distinct' %}
{% set union_command = 'UNION' %}
{% else %}
{%- if execute -%}
{%- do exceptions.warn("[" ~ this ~ "] Warning: Parameter 'union_strategy' set to '" ~ union_strategy ~ "' which is not a supported choice. Set to 'all' or 'distinct' instead. UNION ALL is used now.") -%}
{% endif %}
{% set union_command = 'UNION ALL' %}
{% endif %}

{%- if not datavault4dbt.is_something(foreign_hashkeys) -%}
{%- set foreign_hashkeys = [] -%}
Expand Down Expand Up @@ -214,7 +224,7 @@ source_new_union AS (
FROM src_new_{{ source_number }}

{%- if not loop.last %}
UNION ALL
{{ union_command }}
{% endif -%}

{%- endfor -%}
Expand Down
14 changes: 12 additions & 2 deletions macros/tables/postgres/nh_link.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{%- macro postgres__nh_link(link_hashkey, foreign_hashkeys, payload, source_models, src_ldts, src_rsrc, disable_hwm, source_is_single_batch) -%}
{%- macro postgres__nh_link(link_hashkey, foreign_hashkeys, payload, source_models, src_ldts, src_rsrc, disable_hwm, source_is_single_batch, union_strategy) -%}

{%- set ns = namespace(last_cte= "", source_included_before = {}, has_rsrc_static_defined=true, source_models_rsrc_dict={}) -%}

Expand Down Expand Up @@ -26,6 +26,16 @@
{%- set ns.source_models_rsrc_dict = source_model_values['source_models_rsrc_dict'] -%}
{{ log('source_models: '~source_models, false) }}

{% if union_strategy|lower == 'all' %}
{% set union_command = 'UNION ALL' %}
{% elif union_strategy|lower == 'distinct' %}
{% set union_command = 'UNION' %}
{% else %}
{%- if execute -%}
{%- do exceptions.warn("[" ~ this ~ "] Warning: Parameter 'union_strategy' set to '" ~ union_strategy ~ "' which is not a supported choice. Set to 'all' or 'distinct' instead. UNION ALL is used now.") -%}
{% endif %}
{% set union_command = 'UNION ALL' %}
{% endif %}

{%- if not datavault4dbt.is_something(foreign_hashkeys) -%}
{%- set foreign_hashkeys = [] -%}
Expand Down Expand Up @@ -214,7 +224,7 @@ source_new_union AS (
FROM src_new_{{ source_number }}

{%- if not loop.last %}
UNION ALL
{{ union_command }}
{% endif -%}

{%- endfor -%}
Expand Down
14 changes: 12 additions & 2 deletions macros/tables/redshift/nh_link.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{%- macro redshift__nh_link(link_hashkey, foreign_hashkeys, payload, source_models, src_ldts, src_rsrc, disable_hwm, source_is_single_batch) -%}
{%- macro redshift__nh_link(link_hashkey, foreign_hashkeys, payload, source_models, src_ldts, src_rsrc, disable_hwm, source_is_single_batch, union_strategy) -%}

{%- set ns = namespace(last_cte= "", source_included_before = {}, has_rsrc_static_defined=true, source_models_rsrc_dict={}) -%}

Expand All @@ -25,6 +25,16 @@
{%- set ns.source_models_rsrc_dict = source_model_values['source_models_rsrc_dict'] -%}
{{ log('source_models: '~source_models, false) }}

{% if union_strategy|lower == 'all' %}
{% set union_command = 'UNION ALL' %}
{% elif union_strategy|lower == 'distinct' %}
{% set union_command = 'UNION' %}
{% else %}
{%- if execute -%}
{%- do exceptions.warn("[" ~ this ~ "] Warning: Parameter 'union_strategy' set to '" ~ union_strategy ~ "' which is not a supported choice. Set to 'all' or 'distinct' instead. UNION ALL is used now.") -%}
{% endif %}
{% set union_command = 'UNION ALL' %}
{% endif %}

{%- if not datavault4dbt.is_something(foreign_hashkeys) -%}
{%- set foreign_hashkeys = [] -%}
Expand Down Expand Up @@ -213,7 +223,7 @@ source_new_union AS (
FROM src_new_{{ source_number }}

{%- if not loop.last %}
UNION ALL
{{ union_command }}
{% endif -%}

{%- endfor -%}
Expand Down
15 changes: 13 additions & 2 deletions macros/tables/snowflake/nh_link.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{%- macro snowflake__nh_link(link_hashkey, foreign_hashkeys, payload, source_models, src_ldts, src_rsrc, disable_hwm, source_is_single_batch) -%}
{%- macro snowflake__nh_link(link_hashkey, foreign_hashkeys, payload, source_models, src_ldts, src_rsrc, disable_hwm, source_is_single_batch, union_strategy) -%}

{%- set ns = namespace(last_cte= "", source_included_before = {}, has_rsrc_static_defined=true, source_models_rsrc_dict={}) -%}

Expand All @@ -25,6 +25,17 @@
{%- set ns.source_models_rsrc_dict = source_model_values['source_models_rsrc_dict'] -%}
{{ log('source_models: '~source_models, false) }}

{% if union_strategy|lower == 'all' %}
{% set union_command = 'UNION ALL' %}
{% elif union_strategy|lower == 'distinct' %}
{% set union_command = 'UNION' %}
{% else %}
{%- if execute -%}
{%- do exceptions.warn("[" ~ this ~ "] Warning: Parameter 'union_strategy' set to '" ~ union_strategy ~ "' which is not a supported choice. Set to 'all' or 'distinct' instead. UNION ALL is used now.") -%}
{% endif %}
{% set union_command = 'UNION ALL' %}
{% endif %}

{%- if not datavault4dbt.is_something(foreign_hashkeys) -%}
{%- set foreign_hashkeys = [] -%}
{%- endif -%}
Expand Down Expand Up @@ -212,7 +223,7 @@ source_new_union AS (
FROM src_new_{{ source_number }}

{%- if not loop.last %}
UNION ALL
{{ union_command }}
{% endif -%}

{%- endfor -%}
Expand Down
14 changes: 12 additions & 2 deletions macros/tables/synapse/nh_link.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{%- macro synapse__nh_link(link_hashkey, foreign_hashkeys, payload, source_models, src_ldts, src_rsrc, disable_hwm, source_is_single_batch) -%}
{%- macro synapse__nh_link(link_hashkey, foreign_hashkeys, payload, source_models, src_ldts, src_rsrc, disable_hwm, source_is_single_batch, union_strategy) -%}

{%- set ns = namespace(last_cte= "", source_included_before = {}, has_rsrc_static_defined=true, source_models_rsrc_dict={}) -%}

Expand Down Expand Up @@ -26,6 +26,16 @@
{%- set ns.source_models_rsrc_dict = source_model_values['source_models_rsrc_dict'] -%}
{{ log('source_models: '~source_models, false) }}

{% if union_strategy|lower == 'all' %}
{% set union_command = 'UNION ALL' %}
{% elif union_strategy|lower == 'distinct' %}
{% set union_command = 'UNION' %}
{% else %}
{%- if execute -%}
{%- do exceptions.warn("[" ~ this ~ "] Warning: Parameter 'union_strategy' set to '" ~ union_strategy ~ "' which is not a supported choice. Set to 'all' or 'distinct' instead. UNION ALL is used now.") -%}
{% endif %}
{% set union_command = 'UNION ALL' %}
{% endif %}

{%- if not datavault4dbt.is_something(foreign_hashkeys) -%}
{%- set foreign_hashkeys = [] -%}
Expand Down Expand Up @@ -214,7 +224,7 @@ source_new_union AS (
FROM src_new_{{ source_number }}

{%- if not loop.last %}
UNION ALL
{{ union_command }}
{% endif -%}

{%- endfor -%}
Expand Down

0 comments on commit 208139c

Please sign in to comment.