Skip to content

Commit ed2b051

Browse files
KayrntMcKnight-42colin-rogers-dbt
authored
Support for ingestion time partition table on BigQuery as incremental materialization (#136)
* Support for incremental materialization with ingestion time partition tables * Refactor incremental materialization for readibility * add changelog entry Co-authored-by: Matthew McKnight <91097623+McKnight-42@users.noreply.github.com> Co-authored-by: colin-rogers-dbt <111200756+colin-rogers-dbt@users.noreply.github.com>
1 parent 27e7002 commit ed2b051

14 files changed

+412
-121
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
kind: Features
2+
body: Support for ingestion time partition table on BigQuery as incremental materialization
3+
time: 2022-08-07T16:42:27.232818+02:00
4+
custom:
5+
Author: Kayrnt
6+
Issue: "75"
7+
PR: "136"

dbt/adapters/bigquery/impl.py

+34-3
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,15 @@ class PartitionConfig(dbtClassMixin):
6666
data_type: str = "date"
6767
granularity: str = "day"
6868
range: Optional[Dict[str, Any]] = None
69+
time_ingestion_partitioning: bool = False
70+
71+
def reject_partition_field_column(self, columns: List[Any]) -> List[str]:
72+
return [c for c in columns if not c.name.upper() == self.field.upper()]
6973

7074
def render(self, alias: Optional[str] = None):
71-
column: str = self.field
75+
column: str = self.field if not self.time_ingestion_partitioning else "_PARTITIONTIME"
7276
if alias:
73-
column = f"{alias}.{self.field}"
77+
column = f"{alias}.{column}"
7478

7579
if self.data_type.lower() == "int64" or (
7680
self.data_type.lower() == "date" and self.granularity.lower() == "day"
@@ -79,6 +83,13 @@ def render(self, alias: Optional[str] = None):
7983
else:
8084
return f"{self.data_type}_trunc({column}, {self.granularity})"
8185

86+
def render_wrapped(self, alias: Optional[str] = None):
87+
"""Wrap the partitioning column when time involved to ensure it is properly casted to matching time."""
88+
if self.data_type in ("date", "timestamp", "datetime"):
89+
return f"{self.data_type}({self.render(alias)})"
90+
else:
91+
return self.render(alias)
92+
8293
@classmethod
8394
def parse(cls, raw_partition_by) -> Optional["PartitionConfig"]: # type: ignore [return]
8495
if raw_partition_by is None:
@@ -236,6 +247,12 @@ def get_columns_in_relation(self, relation: BigQueryRelation) -> List[BigQueryCo
236247
logger.debug("get_columns_in_relation error: {}".format(e))
237248
return []
238249

250+
@available.parse(lambda *a, **k: [])
251+
def add_time_ingestion_partition_column(self, columns) -> List[BigQueryColumn]:
252+
"Add time ingestion partition column to columns list"
253+
columns.append(self.Column("_PARTITIONTIME", "TIMESTAMP", None, "NULLABLE"))
254+
return columns
255+
239256
def expand_column_types(self, goal: BigQueryRelation, current: BigQueryRelation) -> None: # type: ignore[override]
240257
# This is a no-op on BigQuery
241258
pass
@@ -434,6 +451,19 @@ def copy_table(self, source, destination, materialization):
434451

435452
return "COPY TABLE with materialization: {}".format(materialization)
436453

454+
@available.parse(lambda *a, **k: False)
455+
def get_columns_in_select_sql(self, select_sql: str) -> List[BigQueryColumn]:
456+
try:
457+
conn = self.connections.get_thread_connection()
458+
client = conn.handle
459+
query_job, iterator = self.connections.raw_execute(select_sql)
460+
query_table = client.get_table(query_job.destination)
461+
return self._get_dbt_columns_from_bq_table(query_table)
462+
463+
except (ValueError, google.cloud.exceptions.NotFound) as e:
464+
logger.debug("get_columns_in_select_sql error: {}".format(e))
465+
return []
466+
437467
@classmethod
438468
def poll_until_job_completes(cls, job, timeout):
439469
retry_count = timeout
@@ -495,7 +525,8 @@ def _partitions_match(self, table, conf_partition: Optional[PartitionConfig]) ->
495525
if not is_partitioned and not conf_partition:
496526
return True
497527
elif conf_partition and table.time_partitioning is not None:
498-
table_field = table.time_partitioning.field.lower()
528+
partioning_field = table.time_partitioning.field or "_PARTITIONTIME"
529+
table_field = partioning_field.lower()
499530
table_granularity = table.partitioning_type.lower()
500531
return (
501532
table_field == conf_partition.field.lower()

dbt/include/bigquery/macros/materializations/incremental.sql

+26-105
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,3 @@
1-
{% macro declare_dbt_max_partition(relation, partition_by, complied_code, language='sql') %}
2-
3-
{#-- TODO: revisit partitioning with python models --#}
4-
{%- if '_dbt_max_partition' in complied_code and language == 'sql' -%}
5-
6-
declare _dbt_max_partition {{ partition_by.data_type }} default (
7-
select max({{ partition_by.field }}) from {{ this }}
8-
where {{ partition_by.field }} is not null
9-
);
10-
11-
{%- endif -%}
12-
13-
{% endmacro %}
14-
15-
161
{% macro dbt_bigquery_validate_get_incremental_strategy(config) %}
172
{#-- Find and validate the incremental strategy #}
183
{%- set strategy = config.get("incremental_strategy") or 'merge' -%}
@@ -28,107 +13,40 @@
2813
{% do return(strategy) %}
2914
{% endmacro %}
3015

16+
{% macro source_sql_with_partition(partition_by, source_sql) %}
3117

32-
{% macro bq_insert_overwrite(
33-
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
34-
) %}
35-
36-
{% if partitions is not none and partitions != [] %} {# static #}
37-
38-
{% set predicate -%}
39-
{{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in (
40-
{{ partitions | join (', ') }}
41-
)
42-
{%- endset %}
43-
44-
{%- set source_sql -%}
45-
(
46-
{{sql}}
47-
)
48-
{%- endset -%}
49-
50-
{#-- Because we're putting the model SQL _directly_ into the MERGE statement,
51-
we need to prepend the MERGE statement with the user-configured sql_header,
52-
which may be needed to resolve that model SQL (e.g. referencing a variable or UDF in the header)
53-
in the "dynamic" case, we save the model SQL result as a temp table first, wherein the
54-
sql_header is included by the create_table_as macro.
55-
#}
56-
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=true) }}
57-
58-
{% else %} {# dynamic #}
59-
60-
{% set predicate -%}
61-
{{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement)
62-
{%- endset %}
63-
64-
{%- set source_sql -%}
65-
(
66-
select * from {{ tmp_relation }}
67-
)
68-
{%- endset -%}
69-
70-
-- generated script to merge partitions into {{ target_relation }}
71-
declare dbt_partitions_for_replacement array<{{ partition_by.data_type }}>;
72-
73-
{# have we already created the temp table to check for schema changes? #}
74-
{% if not tmp_relation_exists %}
75-
{{ declare_dbt_max_partition(this, partition_by, sql) }}
76-
77-
-- 1. create a temp table
78-
{{ create_table_as(True, tmp_relation, compiled_code) }}
79-
{% else %}
80-
-- 1. temp table already exists, we used it to check for schema changes
81-
{% endif %}
82-
83-
-- 2. define partitions to update
84-
set (dbt_partitions_for_replacement) = (
85-
select as struct
86-
array_agg(distinct {{ partition_by.render() }})
87-
from {{ tmp_relation }}
88-
);
89-
90-
-- 3. run the merge statement
91-
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }};
92-
93-
-- 4. clean up the temp table
94-
drop table if exists {{ tmp_relation }}
18+
{%- if partition_by.time_ingestion_partitioning %}
19+
{{ return(wrap_with_time_ingestion_partitioning(build_partition_time_exp(partition_by.field), source_sql, False)) }}
20+
{% else %}
21+
{{ return(source_sql) }}
22+
{%- endif -%}
9523

24+
{% endmacro %}
25+
{% macro bq_create_table_as(is_time_ingestion_partitioning, temporary, relation, compiled_code, language='sql') %}
26+
{% if is_time_ingestion_partitioning %}
27+
{#-- Create the table before inserting data as ingestion time partitioned tables can't be created with the transformed data --#}
28+
{% do run_query(create_ingestion_time_partitioned_table_as(temporary, relation, sql)) %}
29+
{{ return(bq_insert_into_ingestion_time_partitioned_table(relation, sql)) }}
30+
{% else %}
31+
{{ return(create_table_as(temporary, relation, sql)) }}
9632
{% endif %}
97-
9833
{% endmacro %}
9934

100-
10135
{% macro bq_generate_incremental_build_sql(
10236
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
10337
) %}
10438
{#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#}
10539
{% if strategy == 'insert_overwrite' %}
10640

107-
{% set missing_partition_msg -%}
108-
The 'insert_overwrite' strategy requires the `partition_by` config.
109-
{%- endset %}
110-
{% if partition_by is none %}
111-
{% do exceptions.raise_compiler_error(missing_partition_msg) %}
112-
{% endif %}
113-
114-
{% set build_sql = bq_insert_overwrite(
41+
{% set build_sql = bq_generate_incremental_insert_overwrite_build_sql(
11542
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
11643
) %}
11744

11845
{% else %} {# strategy == 'merge' #}
119-
{%- set source_sql -%}
120-
{%- if tmp_relation_exists -%}
121-
(
122-
select * from {{ tmp_relation }}
123-
)
124-
{%- else -%} {#-- wrap sql in parens to make it a subquery --#}
125-
(
126-
{{sql}}
127-
)
128-
{%- endif -%}
129-
{%- endset -%}
130-
131-
{% set build_sql = get_merge_sql(target_relation, source_sql, unique_key, dest_columns) %}
46+
47+
{% set build_sql = bq_generate_incremental_merge_build_sql(
48+
tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists
49+
) %}
13250

13351
{% endif %}
13452

@@ -163,14 +81,14 @@
16381

16482
{% if existing_relation is none %}
16583
{%- call statement('main', language=language) -%}
166-
{{ create_table_as(False, target_relation, compiled_code, language) }}
84+
{{ bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, compiled_code, language) }}
16785
{%- endcall -%}
16886

16987
{% elif existing_relation.is_view %}
17088
{#-- There's no way to atomically replace a view with a table on BQ --#}
17189
{{ adapter.drop_relation(existing_relation) }}
17290
{%- call statement('main', language=language) -%}
173-
{{ create_table_as(False, target_relation, compiled_code, language) }}
91+
{{ bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, compiled_code, language) }}
17492
{%- endcall -%}
17593

17694
{% elif full_refresh_mode %}
@@ -180,7 +98,7 @@
18098
{{ adapter.drop_relation(existing_relation) }}
18199
{% endif %}
182100
{%- call statement('main', language=language) -%}
183-
{{ create_table_as(False, target_relation, compiled_code, language) }}
101+
{{ bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, compiled_code, language) }}
184102
{%- endcall -%}
185103

186104
{% else %}
@@ -198,7 +116,7 @@
198116
{#-- Python always needs to create a temp table --#}
199117
{%- call statement('create_tmp_relation', language=language) -%}
200118
{{ declare_dbt_max_partition(this, partition_by, compiled_code, language) +
201-
create_table_as(True, tmp_relation, compiled_code, language)
119+
bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, compiled_code, language)
202120
}}
203121
{%- endcall -%}
204122
{% set tmp_relation_exists = true %}
@@ -209,6 +127,9 @@
209127
{% if not dest_columns %}
210128
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
211129
{% endif %}
130+
{% if partition_by.time_ingestion_partitioning %}
131+
{% set dest_columns = adapter.add_time_ingestion_partition_column(dest_columns) %}
132+
{% endif %}
212133
{% set build_sql = bq_generate_incremental_build_sql(
213134
strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
214135
) %}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
{% macro build_partition_time_exp(partition_by) %}
2+
{% if partition_by.data_type == 'timestamp' %}
3+
{% set partition_value = partition_by.field %}
4+
{% else %}
5+
{% set partition_value = 'timestamp(' + partition_by.field + ')' %}
6+
{% endif %}
7+
{{ return({'value': partition_value, 'field': partition_by.field}) }}
8+
{% endmacro %}
9+
10+
{% macro declare_dbt_max_partition(relation, partition_by, complied_code, language='sql') %}
11+
12+
{#-- TODO: revisit partitioning with python models --#}
13+
{%- if '_dbt_max_partition' in complied_code and language == 'sql' -%}
14+
15+
declare _dbt_max_partition {{ partition_by.data_type }} default (
16+
select max({{ partition_by.field }}) from {{ this }}
17+
where {{ partition_by.field }} is not null
18+
);
19+
20+
{%- endif -%}
21+
22+
{% endmacro %}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
{% macro bq_generate_incremental_insert_overwrite_build_sql(
2+
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change
3+
) %}
4+
{% if partition_by is none %}
5+
{% set missing_partition_msg -%}
6+
The 'insert_overwrite' strategy requires the `partition_by` config.
7+
{%- endset %}
8+
{% do exceptions.raise_compiler_error(missing_partition_msg) %}
9+
{% endif %}
10+
11+
{% set build_sql = bq_insert_overwrite(
12+
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change
13+
) %}
14+
15+
{{ return(build_sql) }}
16+
17+
{% endmacro %}
18+
19+
{% macro bq_insert_overwrite(
20+
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
21+
) %}
22+
23+
{% if partitions is not none and partitions != [] %} {# static #}
24+
25+
{% set predicate -%}
26+
{{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in (
27+
{{ partitions | join (', ') }}
28+
)
29+
{%- endset %}
30+
31+
{%- set source_sql -%}
32+
(
33+
{%- if partition_by.time_ingestion_partitioning -%}
34+
{{ wrap_with_time_ingestion_partitioning(build_partition_time_exp(partition_by), sql, True) }}
35+
{%- else -%}
36+
{{sql}}
37+
{%- endif -%}
38+
)
39+
{%- endset -%}
40+
41+
{#-- Because we're putting the model SQL _directly_ into the MERGE statement,
42+
we need to prepend the MERGE statement with the user-configured sql_header,
43+
which may be needed to resolve that model SQL (e.g. referencing a variable or UDF in the header)
44+
in the "dynamic" case, we save the model SQL result as a temp table first, wherein the
45+
sql_header is included by the create_table_as macro.
46+
#}
47+
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=true) }}
48+
49+
{% else %} {# dynamic #}
50+
51+
{% set predicate -%}
52+
{{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement)
53+
{%- endset %}
54+
55+
{%- set source_sql -%}
56+
(
57+
select
58+
{% if partition_by.time_ingestion_partitioning -%}
59+
_PARTITIONTIME,
60+
{%- endif -%}
61+
* from {{ tmp_relation }}
62+
)
63+
{%- endset -%}
64+
65+
-- generated script to merge partitions into {{ target_relation }}
66+
declare dbt_partitions_for_replacement array<{{ partition_by.data_type }}>;
67+
68+
{# have we already created the temp table to check for schema changes? #}
69+
{% if not tmp_relation_exists %}
70+
{{ declare_dbt_max_partition(this, partition_by, sql) }}
71+
72+
-- 1. create a temp table
73+
{{ bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, compiled_code) }}
74+
{% else %}
75+
-- 1. temp table already exists, we used it to check for schema changes
76+
{% endif %}
77+
78+
-- 2. define partitions to update
79+
set (dbt_partitions_for_replacement) = (
80+
select as struct
81+
array_agg(distinct {{ partition_by.render_wrapped() }})
82+
from {{ tmp_relation }}
83+
);
84+
85+
-- 3. run the merge statement
86+
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }};
87+
88+
-- 4. clean up the temp table
89+
drop table if exists {{ tmp_relation }}
90+
91+
{% endif %}
92+
93+
{% endmacro %}

0 commit comments

Comments
 (0)