Skip to content

Commit 9e1c480

Browse files
committed
Support for incremental materialization with ingestion time partition tables
1 parent 072050e commit 9e1c480

File tree

2 files changed

+103
-5
lines changed

2 files changed

+103
-5
lines changed

dbt/adapters/bigquery/impl.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,13 @@ class PartitionConfig(dbtClassMixin):
5151
data_type: str = 'date'
5252
granularity: str = 'day'
5353
range: Optional[Dict[str, Any]] = None
54+
time_ingestion_partitioning: bool = False
55+
56+
def reject_partition_field_column(
57+
self,
58+
columns: List[Any]) -> List[str]:
59+
logger.debug("reject_partition_field_column: {}".format(columns))
60+
return [c for c in columns if not c.name.upper() == self.field.upper()]
5461

5562
def render(self, alias: Optional[str] = None):
5663
column: str = self.field
@@ -507,7 +514,10 @@ def _partitions_match(
507514
if not is_partitioned and not conf_partition:
508515
return True
509516
elif conf_partition and table.time_partitioning is not None:
510-
table_field = table.time_partitioning.field.lower()
517+
logger.debug('table.time_partitioning ({})'.format(
518+
table.time_partitioning))
519+
partioning_field = table.time_partitioning.field or '_PARTITIONTIME'
520+
table_field = partioning_field.lower()
511521
table_granularity = table.partitioning_type.lower()
512522
return table_field == conf_partition.field.lower() \
513523
and table_granularity == conf_partition.granularity.lower()

dbt/include/bigquery/macros/materializations/incremental.sql

+92-4
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,84 @@
2727
{% do return(strategy) %}
2828
{% endmacro %}
2929

30+
{% macro get_columns_with_types_in_query(select_sql) %}
31+
{% call statement('get_columns_with_types_in_query', fetch_result=True, auto_begin=False) -%}
32+
select * from (
33+
{{ select_sql }}
34+
) as __dbt_sbq
35+
where false
36+
limit 0
37+
{% endcall %}
38+
{%- set result = load_result('get_columns_with_types_in_query') -%}
39+
{{ log('result ' ~ result) }}
40+
{%- set table = result.table -%}
41+
{{ log('table ' ~ table) }}
42+
{%- set columns = result.columns -%}
43+
{{ log('columns ' ~ columns) }}
44+
{{ return(load_result('get_columns_with_types_in_query').table.columns | list) }}
45+
{% endmacro %}
46+
47+
{% macro create_ingestion_time_partitioned_table_as(temporary, relation, sql) -%}
48+
{%- set raw_partition_by = config.get('partition_by', none) -%}
49+
{%- set raw_cluster_by = config.get('cluster_by', none) -%}
50+
{%- set sql_header = config.get('sql_header', none) -%}
51+
52+
{%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%}
53+
54+
{%- set columns = get_columns_with_types_in_query(sql) -%}
55+
{%- set table_dest_columns_csv = columns_without_partition_fields_csv(partition_config, columns) -%}
56+
57+
{{ sql_header if sql_header is not none }}
58+
59+
{% set ingestion_time_partition_config_raw = fromjson(tojson(raw_partition_by)) %}
60+
{{ log('ingestion_time_partition_config_raw ' ~ ingestion_time_partition_config_raw) }}
61+
{% do ingestion_time_partition_config_raw.update({'field':'_PARTITIONTIME'}) %}
62+
63+
{%- set ingestion_time_partition_config = adapter.parse_partition_by(ingestion_time_partition_config_raw) -%}
64+
65+
create or replace table {{ relation }} ({{table_dest_columns_csv}})
66+
{{ partition_by(ingestion_time_partition_config) }}
67+
{{ cluster_by(raw_cluster_by) }}
68+
{{ bigquery_table_options(config, model, temporary) }}
69+
70+
{%- endmacro -%}
71+
72+
{% macro get_quoted_with_types_csv(columns) %}
73+
{% set quoted = [] %}
74+
{% for col in columns -%}
75+
{{ log('col > ' ~ col) }}
76+
{%- do quoted.append(adapter.quote(col.name) ~ " " ~ col.data_type) -%}
77+
{%- endfor %}
78+
{%- set dest_cols_csv = quoted | join(', ') -%}
79+
{{ return(dest_cols_csv) }}
80+
81+
{% endmacro %}
82+
83+
{% macro columns_without_partition_fields_csv(partition_config, columns) -%}
84+
{%- set columns_no_partition = partition_config.reject_partition_field_column(columns) -%}
85+
{{ log('columns_no_partition ' ~ columns_no_partition) }}
86+
{% set columns_names = get_quoted_with_types_csv(columns_no_partition) %}
87+
{{ return(columns_names) }}
88+
89+
{%- endmacro -%}
90+
91+
{% macro bq_insert_into_ingestion_time_partitioned_table(target_relation, sql) -%}
92+
93+
{%- set partition_by = config.get('partition_by', none) -%}
94+
{% if partition_by.data_type == 'timestamp' %}
95+
{% set partition_time_exp = partition_by.field %}
96+
{% else %}
97+
{% set partition_time_exp = 'timestamp(' + partition_by.field + ')' %}
98+
{% endif %}
99+
{% set dest_columns = adapter.get_columns_in_relation(target_relation) %}
100+
{%- set dest_columns_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
101+
102+
insert into {{ target_relation }} (_partitiontime, {{ dest_columns_csv }})
103+
select {{ partition_time_exp }} as _partitiontime from (
104+
{{ sql }}
105+
)
106+
107+
{%- endmacro -%}
30108

31109
{% macro bq_insert_overwrite(
32110
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
@@ -94,6 +172,15 @@
94172

95173
{% endmacro %}
96174

175+
{% macro bq_create_table_as(is_time_ingestion_partitioning, temporary, relation, sql) %}
176+
{% if is_time_ingestion_partitioning == True %}
177+
{#-- Create the table before inserting data as ingestion time partitioned tables can't be created with the transformed data --#}
178+
{% do run_query(create_ingestion_time_partitioned_table_as(temporary, relation, sql)) %}
179+
{{ return(bq_insert_into_ingestion_time_partitioned_table(relation, sql)) }}
180+
{% else %}
181+
{{ return(create_table_as(temporary, relation, sql)) }}
182+
{% endif %}
183+
{% endmacro %}
97184

98185
{% macro bq_generate_incremental_build_sql(
99186
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
@@ -147,6 +234,7 @@
147234

148235
{%- set raw_partition_by = config.get('partition_by', none) -%}
149236
{%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%}
237+
{% do log("partition by" ~ partition_by) %}
150238
{%- set partitions = config.get('partitions', none) -%}
151239
{%- set cluster_by = config.get('cluster_by', none) -%}
152240

@@ -155,26 +243,26 @@
155243
{{ run_hooks(pre_hooks) }}
156244

157245
{% if existing_relation is none %}
158-
{% set build_sql = create_table_as(False, target_relation, sql) %}
246+
{% set build_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %}
159247

160248
{% elif existing_relation.is_view %}
161249
{#-- There's no way to atomically replace a view with a table on BQ --#}
162250
{{ adapter.drop_relation(existing_relation) }}
163-
{% set build_sql = create_table_as(False, target_relation, sql) %}
251+
{% set build_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %}
164252

165253
{% elif full_refresh_mode %}
166254
{#-- If the partition/cluster config has changed, then we must drop and recreate --#}
167255
{% if not adapter.is_replaceable(existing_relation, partition_by, cluster_by) %}
168256
{% do log("Hard refreshing " ~ existing_relation ~ " because it is not replaceable") %}
169257
{{ adapter.drop_relation(existing_relation) }}
170258
{% endif %}
171-
{% set build_sql = create_table_as(False, target_relation, sql) %}
259+
{% set build_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %}
172260

173261
{% else %}
174262
{% set tmp_relation_exists = false %}
175263
{% if on_schema_change != 'ignore' %} {# Check first, since otherwise we may not build a temp table #}
176264
{% do run_query(
177-
declare_dbt_max_partition(this, partition_by, sql) + create_table_as(True, tmp_relation, sql)
265+
declare_dbt_max_partition(this, partition_by, sql) + bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, sql)
178266
) %}
179267
{% set tmp_relation_exists = true %}
180268
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}

0 commit comments

Comments
 (0)