Skip to content

Commit

Permalink
Merge pull request #234 from ScalefreeCOM/redshift-patch
Browse files Browse the repository at this point in the history
Redshift patch into DEV
  • Loading branch information
tkirschke authored Aug 21, 2024
2 parents b001cba + b095042 commit f995393
Show file tree
Hide file tree
Showing 16 changed files with 72 additions and 72 deletions.
2 changes: 1 addition & 1 deletion macros/internal/helpers/stage_processing_macros.sql
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,4 @@
{%- endif %}
{%- endfor -%}

{%- endmacro -%}
{%- endmacro -%}
8 changes: 4 additions & 4 deletions macros/staging/redshift/stage.sql
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ unknown_values AS (
{%- if columns_without_excluded_columns is defined and columns_without_excluded_columns| length > 0 -%},
{# Generating Ghost Records for all source columns, except the ldts, rsrc & edwSequence column and derived_columns #}
{%- for column in columns_without_excluded_columns %}
{{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.dtype, ghost_record_type='unknown') }}
{{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.data_type, ghost_record_type='unknown') }}
{%- if not loop.last %},{% endif -%}
{%- endfor -%}

Expand Down Expand Up @@ -470,7 +470,7 @@ unknown_values AS (

{% if column.name|lower == vals['bk']|lower -%}
{{ log('column found? yes, for column :' ~ column.name , false) }}
{{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.dtype, ghost_record_type='unknown', alias=col) }}
{{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.data_type, ghost_record_type='unknown', alias=col) }}
{%- endif -%}

{%- endfor -%}
Expand Down Expand Up @@ -509,7 +509,7 @@ error_values AS (
{%- if columns_without_excluded_columns is defined and columns_without_excluded_columns| length > 0 -%},
{# Generating Ghost Records for Source Columns #}
{%- for column in columns_without_excluded_columns %}
{{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.dtype, ghost_record_type='error') }}
{{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.data_type, ghost_record_type='error') }}
{%- if not loop.last %},{% endif -%}
{%- endfor -%}

Expand Down Expand Up @@ -537,7 +537,7 @@ error_values AS (

{% for column in pj_relation_columns -%}
{% if column.name|lower == vals['bk']|lower -%}
{{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.dtype, ghost_record_type='error', alias=col) -}}
{{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.data_type, ghost_record_type='error', alias=col) -}}
{%- endif -%}
{%- endfor -%}
{%- if not loop.last -%},{%- endif %}
Expand Down
75 changes: 37 additions & 38 deletions macros/supporting/ghost_record_per_datatype.sql
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,10 @@
{%- set alias = datavault4dbt.escape_column_names(alias) -%}

{%- if ghost_record_type == 'unknown' -%}
{%- if datatype in ['TIMESTAMP_NTZ','TIMESTAMP'] %}{{ datavault4dbt.string_to_timestamp(timestamp_format, beginning_of_all_times) }} AS {{ alias }}
{%- elif datatype == 'DATE'-%} TO_DATE('{{ beginning_of_all_times_date }}', '{{ date_format }}' ) as {{ alias }}
{%- elif datatype in ['STRING', 'VARCHAR'] %}'{{ unknown_value__STRING }}' AS {{ alias }}
{%- elif datatype == 'CHAR' %}CAST('{{ unknown_value_alt__STRING }}' as {{ datatype }} ) as {{ alias }}
{%- if datatype in ['TIMESTAMP_NTZ','TIMESTAMP'] %}{{ datavault4dbt.string_to_timestamp(timestamp_format, beginning_of_all_times) }} AS "{{ alias }}"
{%- elif datatype == 'DATE'-%} TO_DATE('{{ beginning_of_all_times_date }}', '{{ date_format }}' ) as "{{ alias }}"
{%- elif datatype in ['STRING', 'VARCHAR'] %}'{{ unknown_value__STRING }}' AS "{{ alias }}"
{%- elif datatype == 'CHAR' %}CAST('{{ unknown_value_alt__STRING }}' as {{ datatype }} ) as "{{ alias }}"
{%- elif datatype.upper().startswith('VARCHAR(') or datatype.upper().startswith('CHAR(') -%}
{%- if col_size is not none -%}
{%- set unknown_dtype_length = col_size | int -%}
Expand All @@ -180,15 +180,15 @@
{%- else -%}
CAST('{{ unknown_value__STRING }}' as {{ datatype }} ) as {{ alias }}
{%- endif -%}
{%- elif datatype in ['NUMBER','INT','FLOAT','DECIMAL'] %}0 AS {{ alias }}
{%- elif datatype == 'BOOLEAN' %}CAST('FALSE' AS BOOLEAN) AS {{ alias }}
{%- else %}NULL AS {{ alias }}
{%- elif datatype in ['NUMBER','INT','FLOAT','DECIMAL'] %}0 AS "{{ alias }}"
{%- elif datatype == 'BOOLEAN' %}CAST('FALSE' AS BOOLEAN) AS "{{ alias }}"
{%- else %}NULL AS "{{ alias }}"
{% endif %}
{%- elif ghost_record_type == 'error' -%}
{%- if datatype in ['TIMESTAMP_NTZ','TIMESTAMP'] %}{{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} AS {{ alias }}
{%- elif datatype == 'DATE'-%} TO_DATE('{{ end_of_all_times_date }}', '{{ date_format }}' ) as {{ alias }}
{%- elif datatype in ['STRING','VARCHAR'] %}'{{ error_value__STRING }}' AS {{ alias }}
{%- elif datatype == 'CHAR' %}CAST('{{ error_value_alt__STRING }}' as {{ datatype }} ) as {{ alias }}
{%- if datatype in ['TIMESTAMP_NTZ','TIMESTAMP'] %}{{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} AS "{{ alias }}"
{%- elif datatype == 'DATE'-%} TO_DATE('{{ end_of_all_times_date }}', '{{ date_format }}' ) as "{{ alias }}"
{%- elif datatype in ['STRING','VARCHAR'] %}'{{ error_value__STRING }}' AS "{{ alias }}"
{%- elif datatype == 'CHAR' %}CAST('{{ error_value_alt__STRING }}' as {{ datatype }} ) as "{{ alias }}"
{%- elif datatype.upper().startswith('VARCHAR(') or datatype.upper().startswith('CHAR(') -%}
{%- if col_size is not none -%}
{%- set error_dtype_length = col_size | int -%}
Expand All @@ -205,9 +205,9 @@
{%- else -%}
CAST('{{ error_value__STRING }}' as {{ datatype }} ) as {{ alias }}
{%- endif -%}
{% elif datatype in ['NUMBER','INT','FLOAT','DECIMAL'] %}-1 AS {{ alias }}
{% elif datatype == 'BOOLEAN' %}CAST('FALSE' AS BOOLEAN) AS {{ alias }}
{% else %}NULL AS {{ alias }}
{% elif datatype in ['NUMBER','INT','FLOAT','DECIMAL'] %}-1 AS "{{ alias }}"
{% elif datatype == 'BOOLEAN' %}CAST('FALSE' AS BOOLEAN) AS "{{ alias }}"
{% else %}NULL AS "{{ alias }}"
{% endif %}
{%- else -%}
{%- if execute -%}
Expand Down Expand Up @@ -349,43 +349,42 @@
{%- set beginning_of_all_times = datavault4dbt.beginning_of_all_times() -%}
{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%}
{%- set timestamp_format = datavault4dbt.timestamp_format() -%}

{%- set beginning_of_all_times_date = var('datavault4dbt.beginning_of_all_times_date', '0001-01-01') -%}
{%- set end_of_all_times_date = var('datavault4dbt.end_of_all_times_date', '8888-12-31') -%}
{%- set date_format = var('datavault4dbt.date_format', 'YYYY-mm-dd') -%}

{%- set unknown_value__STRING = var('datavault4dbt.unknown_value__STRING', '(unknown)') -%}
{%- set error_value__STRING = var('datavault4dbt.error_value__STRING', '(error)') -%}
{%- set hash = datavault4dbt.hash_method() -%}
{%- set hash_default_values = datavault4dbt.hash_default_values(hash_function=hash) -%}
{%- set hash_alg= hash_default_values['hash_alg'] -%}
{%- set unknown_value__HASHTYPE = hash_default_values['unknown_key'] -%}
{%- set error_value__HASHTYPE = hash_default_values['error_key'] -%}
{%- set datatype = datatype | string | upper | trim -%}

{%- if ghost_record_type == 'unknown' -%}
{%- if datatype == 'TIMESTAMP' %} {{ datavault4dbt.string_to_timestamp( timestamp_format , beginning_of_all_times) }} as {{ alias }}
{%- if 'TIMESTAMP' in datatype %}{{ datavault4dbt.string_to_timestamp(timestamp_format, beginning_of_all_times_date) }} AS {{ alias }}
{%- elif datatype == 'TIMETZ' %} CAST('00:00:01 UTC' as TIMETZ) as {{ alias }}
{%- elif datatype == 'TIME' %} CAST('00:00:01' as TIME) as {{ alias }}
{%- elif datatype == 'DATE'-%} TO_DATE('{{ beginning_of_all_times_date }}', '{{ date_format }}' ) as "{{ alias }}"
{%- elif datatype == 'VARCHAR' %} '{{unknown_value__STRING}}' as {{ alias }}
{%- elif datatype == 'CHARACTER' %} '{{unknown_value__STRING}}' as {{ alias }}
{%- elif datatype == 'INT' %} CAST('0' as INT) as {{ alias }}
{%- elif datatype == 'INT2' %} CAST('0' as INT2) as {{ alias }}
{%- elif datatype == 'INT8' %} CAST('0' as INT8) as {{ alias }}
{%- elif datatype == 'NUMERIC' %} CAST('0' as NUMERIC) as {{ alias }}
{%- elif datatype == 'FLOAT4' %} CAST('0' as FLOAT4) as {{ alias }}
{%- elif datatype == 'FLOAT' %} CAST('0' as FLOAT) as {{ alias }}
{%- elif datatype == 'BOOLEAN' %} CAST('FALSE' as BOOLEAN) as {{ alias }}
{%- elif datatype == 'VARBINARY' %} 'NULL'::varbyte as {{ alias }}
{%- elif 'CHAR' in datatype or datatype == 'TEXT' %} '{{unknown_value__STRING}}' as {{ alias }}
{%- elif datatype in ['INTEGER', 'INT', 'INT2', 'INT4', 'INT8', 'SMALLINT', 'BIGINT', 'REAL', 'FLOAT4', 'DOUBLE PRECISION', 'DOUBLE', 'FLOAT', 'FLOAT8'] %} CAST(0 as {{ datatype }}) as {{ alias }}
{%- elif 'DECIMAL' in datatype or 'NUMERIC' in datatype %} CAST(0 as {{ datatype }}) as {{ alias }}
{%- elif datatype in ['BOOLEAN', 'BOOL'] %} CAST('TRUE' as BOOLEAN) as {{ alias }}
{%- elif datatype in ['VARBYTE', 'VARBINARY', 'BINARY VARYING'] %} CAST('{{ unknown_value__HASHTYPE }}' as {{ datatype }}) as "{{ alias }}"
{%- elif datatype == 'GEOMETRY' %} CAST(ST_POINT(0, 90) as {{ datatype }}) as {{ alias }}
{%- else %} CAST(NULL as {{ datatype }}) as {{ alias }}
{% endif %}
{%- elif ghost_record_type == 'error' -%}
{%- if datatype == 'TIMESTAMP' %} {{ datavault4dbt.string_to_timestamp( timestamp_format , end_of_all_times) }} as {{ alias }}
{%- if 'TIMESTAMP' in datatype %}{{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} AS {{ alias }}
{%- elif datatype == 'TIMETZ' %} CAST('23:59:59 UTC' as TIMETZ) as {{ alias }}
{%- elif datatype == 'TIME' %} CAST('23:59:59' as TIME) as {{ alias }}
{%- elif datatype == 'DATE'-%} TO_DATE('{{ end_of_all_times_date }}', '{{ date_format }}' ) as "{{ alias }}"
{%- elif datatype == 'VARCHAR' %} '{{error_value__STRING}}' as {{ alias }}
{%- elif datatype == 'CHARACTER' %} '{{error_value__STRING}}' as {{ alias }}
{%- elif datatype == 'INT' %} CAST('-1' as INT) as {{ alias }}
{%- elif datatype == 'INT2' %} CAST('-1' as INT2) as {{ alias }}
{%- elif datatype == 'INT8' %} CAST('-1' as INT8) as {{ alias }}
{%- elif datatype == 'NUMERIC' %} CAST('-1' as NUMERIC) as {{ alias }}
{%- elif datatype == 'FLOAT4' %} CAST('-1' as FLOAT4) as {{ alias }}
{%- elif datatype == 'FLOAT' %} CAST('-1' as FLOAT) as {{ alias }}
{%- elif datatype == 'BOOLEAN' %} CAST('FALSE' as BOOLEAN) as {{ alias }}
{%- elif datatype == 'VARBINARY' %} 'NULL'::varbyte as {{ alias }}
{%- elif 'CHAR' in datatype or datatype == 'TEXT' %} '{{error_value__STRING}}' as {{ alias }}
{%- elif datatype in ['INTEGER', 'INT', 'INT2', 'INT4', 'INT8', 'SMALLINT', 'BIGINT', 'REAL', 'FLOAT4', 'DOUBLE PRECISION', 'DOUBLE', 'FLOAT', 'FLOAT8'] %} CAST(-1 as {{ datatype }}) as {{ alias }}
{%- elif 'DECIMAL' in datatype or 'NUMERIC' in datatype %} CAST(-1 as {{ datatype }}) as {{ alias }}
{%- elif datatype in ['BOOLEAN', 'BOOL'] %} CAST('FALSE' as BOOLEAN) as {{ alias }}
{%- elif datatype in ['VARBYTE', 'VARBINARY', 'BINARY VARYING'] %} CAST('{{ error_value__HASHTYPE }}' as {{ datatype }}) as "{{ alias }}"
{%- elif datatype == 'GEOMETRY' %} CAST(ST_POINT(0, 90) as {{ datatype }}) as {{ alias }}
{%- else %} CAST(NULL as {{ datatype }}) as {{ alias }}
{% endif %}
{%- else -%}
Expand Down
1 change: 0 additions & 1 deletion macros/supporting/hash_standardization.sql
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ CONCAT('\"', REPLACE(REPLACE(REPLACE(TRIM(CAST([EXPRESSION] AS STRING)), '\\', '

{% endif %}


{%- endmacro -%}

{%- macro redshift__attribute_standardise(hash_type) -%}
Expand Down
2 changes: 1 addition & 1 deletion macros/supporting/string_to_timestamp.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@
{%- endmacro -%}

{%- macro redshift__string_to_timestamp(format, timestamp) -%}
TO_TIMESTAMP('{{ timestamp }}', '{{ format }}')
CAST(TO_TIMESTAMP('{{ timestamp }}', '{{ format }}') AS TIMESTAMP)
{%- endmacro -%}

2 changes: 1 addition & 1 deletion macros/tables/postgres/control_snap_v0.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{%- macro postgres__control_snap_v0(start_date, daily_snapshot_time, sdts_alias, end_date) -%}
{%- macro postgres__control_snap_v0(start_date, daily_snapshot_time, sdts_alias, end_date=none) -%}

{%- set timestamp_format = datavault4dbt.timestamp_format() -%}

Expand Down
1 change: 1 addition & 0 deletions macros/tables/postgres/ref_sat_v1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
{%- set timestamp_format = datavault4dbt.timestamp_format() -%}

{%- set is_current_col_alias = var('datavault4dbt.is_current_col_alias', 'IS_CURRENT') -%}
{%- set ledts_alias = var('datavault4dbt.ledts_alias', 'ledts') -%}

{%- set source_relation = ref(ref_sat_v0) -%}

Expand Down
4 changes: 2 additions & 2 deletions macros/tables/redshift/control_snap_v0.sql
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
{%- macro redshift__control_snap_v0(start_date, daily_snapshot_time, sdts_alias, end_date) -%}

{%- set timestamp_format = datavault4dbt.timestamp_format() -%}

{%- set timestamp_value = start_date ~ ' ' ~ daily_snapshot_time -%}
{%- if not datavault4dbt.is_something(sdts_alias) -%}
{%- set sdts_alias = var('datavault4dbt.sdts_alias', 'sdts') -%}
{%- endif -%}

with recursive generate_dates({{ sdts_alias }}) as (
Select to_timestamp('{{ start_date }} {{ daily_snapshot_time }}', '{{ timestamp_format }}') as {{ sdts_alias }}
Select {{ datavault4dbt.string_to_timestamp(timestamp_format, timestamp_value) }} as {{ sdts_alias }}
union all
select {{ sdts_alias }} + 1
from generate_dates
Expand Down
2 changes: 1 addition & 1 deletion macros/tables/redshift/pit.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
{%- set hash_alg = hash_default_values['hash_alg'] -%}
{%- set unknown_key = hash_default_values['unknown_key'] -%}
{%- set error_key = hash_default_values['error_key'] -%}
{% set string_default_dtype = datavault4dbt.string_default_dtype() %}
{%- set string_default_dtype = datavault4dbt.string_default_dtype() -%}

{%- if hash_dtype == 'BYTES' -%}
{%- set hashkey_string = 'TO_HEX({})'.format(datavault4dbt.prefix([hashkey],'te')) -%}
Expand Down
1 change: 1 addition & 0 deletions macros/tables/redshift/ref_sat_v1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
{%- set timestamp_format = datavault4dbt.timestamp_format() -%}

{%- set is_current_col_alias = var('datavault4dbt.is_current_col_alias', 'IS_CURRENT') -%}
{%- set ledts_alias = var('datavault4dbt.ledts_alias', 'ledts') -%}

{%- set source_relation = ref(ref_sat_v0) -%}

Expand Down
4 changes: 2 additions & 2 deletions macros/tables/snowflake/hub.sql
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ WITH
SELECT
{{ hk_column }} AS {{ hashkey }},
{% for bk in source_model['bk_columns'] -%}
{{ bk }},
{{ bk }} AS {{ business_keys[loop.index - 1] }},
{% endfor -%}

{{ src_ldts }},
Expand Down Expand Up @@ -188,7 +188,7 @@ source_new_union AS (
{{ hashkey }},

{% for bk in source_model['bk_columns'] -%}
{{ bk }} AS {{ business_keys[loop.index - 1] }},
{{ business_keys[loop.index - 1] }},
{% endfor -%}

{{ src_ldts }},
Expand Down
2 changes: 1 addition & 1 deletion macros/tables/snowflake/rec_track_sat.sql
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ WITH
{%- set source_in_target = true -%}

{%- if execute -%}
{%- set rsrc_static_result = run_query(rsrc_static_query_source) -%}
{%- set rsrc_static_result = run_query(rsrc_static_query_source_count) -%}

{%- set row_count = rsrc_static_result.columns[0].values()[0] -%}

Expand Down
20 changes: 10 additions & 10 deletions macros/tables/snowflake/ref_sat_v0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ source_data AS (

SELECT
{% for ref_key in parent_ref_keys %}
{{ref_key}},
"{{ref_key}}",
{% endfor %}
{{ ns.src_hashdiff }} as {{ ns.hdiff_alias }},
{{ datavault4dbt.print_list(source_cols) }}
{{- "\n\n " ~ datavault4dbt.print_list(datavault4dbt.escape_column_names(source_cols)) if source_cols else " *" }}
FROM {{ source_relation }}

{%- if is_incremental() and not disable_hwm %}
Expand All @@ -50,12 +50,12 @@ latest_entries_in_sat AS (

SELECT
{% for ref_key in parent_ref_keys %}
{{ref_key}},
"{{ref_key}}",
{% endfor %}
{{ ns.hdiff_alias }}
FROM
{{ this }}
QUALIFY ROW_NUMBER() OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} {{ref_key}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }} DESC) = 1
QUALIFY ROW_NUMBER() OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} "{{ref_key}}" {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }} DESC) = 1
),
{%- endif %}

Expand All @@ -67,17 +67,17 @@ deduplicated_numbered_source AS (

SELECT
{% for ref_key in parent_ref_keys %}
{{ref_key}},
"{{ref_key}}",
{% endfor %}
{{ ns.hdiff_alias }},
{{ datavault4dbt.print_list(source_cols) }}
{{- "\n\n " ~ datavault4dbt.print_list(datavault4dbt.escape_column_names(source_cols)) if source_cols else " *" }}
{% if is_incremental() -%}
, ROW_NUMBER() OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} {{ref_key}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }}) as rn
, ROW_NUMBER() OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} "{{ref_key}}" {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }}) as rn
{%- endif %}
FROM source_data
QUALIFY
CASE
WHEN {{ ns.hdiff_alias }} = LAG({{ ns.hdiff_alias }}) OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} {{ref_key}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }}) THEN FALSE
WHEN {{ ns.hdiff_alias }} = LAG({{ ns.hdiff_alias }}) OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} "{{ref_key}}" {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }}) THEN FALSE
ELSE TRUE
END
),
Expand All @@ -90,10 +90,10 @@ records_to_insert AS (

SELECT
{% for ref_key in parent_ref_keys %}
{{ref_key}},
"{{ref_key}}",
{% endfor %}
{{ ns.hdiff_alias }},
{{ datavault4dbt.print_list(source_cols) }}
{{- "\n\n " ~ datavault4dbt.print_list(datavault4dbt.escape_column_names(source_cols)) if source_cols else " *" }}
FROM deduplicated_numbered_source
{%- if is_incremental() %}
WHERE NOT EXISTS (
Expand Down
10 changes: 5 additions & 5 deletions macros/tables/snowflake/ref_sat_v1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@ end_dated_source AS (

SELECT
{% for ref_key in ref_keys %}
{{ref_key}},
"{{ref_key}}",
{% endfor %}
{{ hashdiff }},
{{ src_rsrc }},
{{ src_ldts }},
COALESCE(LEAD({{ src_ldts }} - INTERVAL '1 MICROSECOND') OVER (PARTITION BY {%- for ref_key in ref_keys %} {{ref_key}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }}),{{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }}) as {{ ledts_alias }},
{{ datavault4dbt.print_list(source_columns_to_select) }}
COALESCE(LEAD({{ src_ldts }} - INTERVAL '1 MICROSECOND') OVER (PARTITION BY {%- for ref_key in ref_keys %} "{{ref_key}}" {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }}),{{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }}) as {{ ledts_alias }},
{{- "\n\n " ~ datavault4dbt.print_list(datavault4dbt.escape_column_names(source_columns_to_select)) if source_columns_to_select else " *" }}
FROM {{ source_relation }}

)

SELECT
{% for ref_key in ref_keys %}
{{ref_key}},
"{{ref_key}}",
{% endfor %}
{{ hashdiff }},
{{ src_rsrc }},
Expand All @@ -48,7 +48,7 @@ SELECT
ELSE FALSE
END AS {{ is_current_col_alias }},
{% endif -%}
{{ datavault4dbt.print_list(source_columns_to_select) }}
{{- "\n\n " ~ datavault4dbt.print_list(datavault4dbt.escape_column_names(source_columns_to_select)) if source_columns_to_select else " *" }}
FROM end_dated_source

{%- endmacro -%}
Loading

0 comments on commit f995393

Please sign in to comment.