diff --git a/dbt/adapters/clickhouse/impl.py b/dbt/adapters/clickhouse/impl.py index f482ddb4..2262dc4f 100644 --- a/dbt/adapters/clickhouse/impl.py +++ b/dbt/adapters/clickhouse/impl.py @@ -113,7 +113,7 @@ def calculate_incremental_strategy(self, strategy: str) -> str: if not strategy or strategy == 'default': strategy = 'delete_insert' if conn.handle.use_lw_deletes else 'legacy' strategy = strategy.replace('+', '_') - if strategy not in ['legacy', 'append', 'delete_insert']: + if strategy not in ['legacy', 'append', 'delete_insert', 'insert_replace']: raise DbtRuntimeError( f"The incremental strategy '{strategy}' is not valid for ClickHouse" ) diff --git a/dbt/include/clickhouse/macros/materializations/incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental.sql index 1eb35b0e..38ed9608 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental.sql @@ -41,10 +41,14 @@ {% endcall %} {% set need_swap = true %} - {% elif inserts_only or unique_key is none -%} + {% elif + inserts_only + or unique_key is none + and config.get('incremental_strategy', none) != 'insert+replace' -%} -- There are no updates/deletes or duplicate keys are allowed. Simply add all of the new rows to the existing -- table. It is the user's responsibility to avoid duplicates. Note that "inserts_only" is a ClickHouse adapter -- specific configurable that is used to avoid creating an expensive intermediate table. + -- Insert+replace strategy does not require unique_key => is an exception. {% call statement('main') %} {{ clickhouse__insert_into(target_relation, sql) }} {% endcall %} @@ -72,6 +76,12 @@ {% call statement('main') %} {{ clickhouse__insert_into(target_relation, sql) }} {% endcall %} + {% elif incremental_strategy == 'insert_replace' %}#} + {%- set partition_by = config.get('partition_by') -%} + {% if inserts_only or unique_key is not none or incremental_predicates is not none %} + {% do exceptions.raise_compiler_error(incremental_strategy + ' strategy does not support inserts_only, unique_key, and incremental predicates.') %} + {% endif %} + {% do clickhouse__incremental_insert_replace(existing_relation, intermediate_relation, partition_by) %} %} {% endif %} {% endif %} @@ -234,3 +244,36 @@ {% do adapter.drop_relation(new_data_relation) %} {{ drop_relation_if_exists(distributed_new_data_relation) }} {% endmacro %} + +{% macro clickhouse__incremental_insert_replace(existing_relation, intermediate_relation, partition_by) %} + {% set new_data_relation = existing_relation.incorporate(path={"identifier": model['name'] + + '__dbt_new_data_' + invocation_id.replace('-', '_')}) %} + {{ drop_relation_if_exists(new_data_relation) }} + {% call statement('create_new_data_temp') -%} + {{ get_create_table_as_sql(False, new_data_relation, sql) }} + {%- endcall %} + {% call statement('main') -%} + create table {{ intermediate_relation }} as {{ existing_relation }} + {%- endcall %} + {% call statement('insert_new_data') -%} + insert into {{ intermediate_relation }} select * from {{ new_data_relation }} + {%- endcall %} + {% if execute %} + {% set select_changed_partitions %} + select distinct {{ partition_by|join(', ') }} + from {{ intermediate_relation }} + {% endset %} + {% set partitions_expressions = run_query(select_changed_partitions).rows %} + {% else %} + {% set partitions_expressions = [] %} + {% endif %} + {% for partition_expression in partitions_expressions %} + {% call statement('replace_partition_' ~ loop.index) %} + alter table {{ existing_relation }} + replace partition {{ partition_expression }} + from {{ existing_relation }} + {% endcall %} + {% endfor %} + {% do adapter.drop_relation(intermediate_relation) %} + {% do adapter.drop_relation(new_data_relation) %} +{% endmacro %}