Skip to content

Commit b8b0d3a

Browse files
committed
Support for incremental materialization with ingestion time partition tables
1 parent 3d69869 commit b8b0d3a

File tree

5 files changed

+231
-32
lines changed

5 files changed

+231
-32
lines changed

dbt/adapters/bigquery/connections.py

+19-5
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import json
22
import re
33
from contextlib import contextmanager
4-
from dataclasses import dataclass
4+
from dataclasses import dataclass, field
55
from functools import lru_cache
66
import agate
77
from requests.exceptions import ConnectionError
8-
from typing import Optional, Any, Dict, Tuple
8+
from typing import Optional, Any, Dict, Tuple, List
99

1010
import google.auth
1111
import google.auth.exceptions
@@ -86,6 +86,7 @@ class BigQueryConnectionMethod(StrEnum):
8686
@dataclass
8787
class BigQueryAdapterResponse(AdapterResponse):
8888
bytes_processed: Optional[int] = None
89+
fields: List[Any] = field(default_factory=list)
8990

9091

9192
@dataclass
@@ -434,6 +435,7 @@ def execute(
434435
code = None
435436
num_rows = None
436437
bytes_processed = None
438+
fields = list()
437439

438440
if query_job.statement_type == "CREATE_VIEW":
439441
code = "CREATE VIEW"
@@ -448,6 +450,7 @@ def execute(
448450
bytes_processed = query_job.total_bytes_processed
449451
processed_bytes = self.format_bytes(bytes_processed)
450452
message = f"{code} ({num_rows_formated} rows, {processed_bytes} processed)"
453+
fields = query_table.schema
451454

452455
elif query_job.statement_type == "SCRIPT":
453456
code = "SCRIPT"
@@ -473,9 +476,14 @@ def execute(
473476
bytes_processed = query_job.total_bytes_processed
474477
processed_bytes = self.format_bytes(bytes_processed)
475478
message = f"{code} ({num_rows_formated} rows, {processed_bytes} processed)"
479+
fields = query_table.schema
476480

477481
response = BigQueryAdapterResponse( # type: ignore[call-arg]
478-
_message=message, rows_affected=num_rows, code=code, bytes_processed=bytes_processed
482+
_message=message,
483+
rows_affected=num_rows,
484+
code=code,
485+
bytes_processed=bytes_processed,
486+
fields=fields,
479487
)
480488

481489
return response, table
@@ -529,7 +537,8 @@ def copy_and_results():
529537

530538
self._retry_and_handle(
531539
msg='copy table "{}" to "{}"'.format(
532-
", ".join(source_ref.path for source_ref in source_ref_array), destination_ref.path
540+
", ".join(source_ref.path for source_ref in source_ref_array),
541+
destination_ref.path,
533542
),
534543
conn=conn,
535544
fn=copy_and_results,
@@ -571,7 +580,12 @@ def fn():
571580
self._retry_and_handle(msg="create dataset", conn=conn, fn=fn)
572581

573582
def _query_and_results(
574-
self, client, sql, job_params, job_creation_timeout=None, job_execution_timeout=None
583+
self,
584+
client,
585+
sql,
586+
job_params,
587+
job_creation_timeout=None,
588+
job_execution_timeout=None,
575589
):
576590
"""Query the client and wait for results."""
577591
# Cannot reuse job_config if destination is set and ddl is used

dbt/adapters/bigquery/impl.py

+40-10
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,13 @@
77
import dbt.clients.agate_helper
88

99
from dbt import ui # type: ignore
10-
from dbt.adapters.base import BaseAdapter, available, RelationType, SchemaSearchMap, AdapterConfig
10+
from dbt.adapters.base import (
11+
BaseAdapter,
12+
available,
13+
RelationType,
14+
SchemaSearchMap,
15+
AdapterConfig,
16+
)
1117
from dbt.adapters.bigquery.relation import BigQueryRelation
1218
from dbt.adapters.bigquery import BigQueryColumn
1319
from dbt.adapters.bigquery import BigQueryConnectionManager
@@ -47,11 +53,15 @@ class PartitionConfig(dbtClassMixin):
4753
data_type: str = "date"
4854
granularity: str = "day"
4955
range: Optional[Dict[str, Any]] = None
56+
time_ingestion_partitioning: bool = False
57+
58+
def reject_partition_field_column(self, columns: List[Any]) -> List[str]:
59+
return [c for c in columns if not c.name.upper() == self.field.upper()]
5060

5161
def render(self, alias: Optional[str] = None):
52-
column: str = self.field
62+
column: str = self.field if not self.time_ingestion_partitioning else "_PARTITIONTIME"
5363
if alias:
54-
column = f"{alias}.{self.field}"
64+
column = f"{alias}.{column}"
5565

5666
if self.data_type.lower() == "int64" or (
5767
self.data_type.lower() == "date" and self.granularity.lower() == "day"
@@ -89,7 +99,11 @@ def render(self):
8999

90100
def _stub_relation(*args, **kwargs):
91101
return BigQueryRelation.create(
92-
database="", schema="", identifier="", quote_policy={}, type=BigQueryRelation.Table
102+
database="",
103+
schema="",
104+
identifier="",
105+
quote_policy={},
106+
type=BigQueryRelation.Table,
93107
)
94108

95109

@@ -209,7 +223,9 @@ def check_schema_exists(self, database: str, schema: str) -> bool:
209223
def get_columns_in_relation(self, relation: BigQueryRelation) -> List[BigQueryColumn]:
210224
try:
211225
table = self.connections.get_bq_table(
212-
database=relation.database, schema=relation.schema, identifier=relation.identifier
226+
database=relation.database,
227+
schema=relation.schema,
228+
identifier=relation.identifier,
213229
)
214230
return self._get_dbt_columns_from_bq_table(table)
215231

@@ -358,7 +374,10 @@ def _materialize_as_view(self, model: Dict[str, Any]) -> str:
358374

359375
logger.debug("Model SQL ({}):\n{}".format(model_alias, model_sql))
360376
self.connections.create_view(
361-
database=model_database, schema=model_schema, table_name=model_alias, sql=model_sql
377+
database=model_database,
378+
schema=model_schema,
379+
table_name=model_alias,
380+
sql=model_sql,
362381
)
363382
return "CREATE VIEW"
364383

@@ -379,7 +398,10 @@ def _materialize_as_table(
379398

380399
logger.debug("Model SQL ({}):\n{}".format(table_name, model_sql))
381400
self.connections.create_table(
382-
database=model_database, schema=model_schema, table_name=table_name, sql=model_sql
401+
database=model_database,
402+
schema=model_schema,
403+
table_name=table_name,
404+
sql=model_sql,
383405
)
384406

385407
return "CREATE TABLE"
@@ -462,7 +484,8 @@ def _partitions_match(self, table, conf_partition: Optional[PartitionConfig]) ->
462484
if not is_partitioned and not conf_partition:
463485
return True
464486
elif conf_partition and table.time_partitioning is not None:
465-
table_field = table.time_partitioning.field.lower()
487+
partioning_field = table.time_partitioning.field or "_PARTITIONTIME"
488+
table_field = partioning_field.lower()
466489
table_granularity = table.partitioning_type.lower()
467490
return (
468491
table_field == conf_partition.field.lower()
@@ -508,7 +531,9 @@ def is_replaceable(
508531

509532
try:
510533
table = self.connections.get_bq_table(
511-
database=relation.database, schema=relation.schema, identifier=relation.identifier
534+
database=relation.database,
535+
schema=relation.schema,
536+
identifier=relation.identifier,
512537
)
513538
except google.cloud.exceptions.NotFound:
514539
return True
@@ -630,7 +655,12 @@ def load_dataframe(self, database, schema, table_name, agate_table, column_overr
630655

631656
@available.parse_none
632657
def upload_file(
633-
self, local_file_path: str, database: str, table_schema: str, table_name: str, **kwargs
658+
self,
659+
local_file_path: str,
660+
database: str,
661+
table_schema: str,
662+
table_name: str,
663+
**kwargs,
634664
) -> None:
635665
conn = self.connections.get_thread_connection()
636666
client = conn.handle

dbt/include/bigquery/macros/materializations/incremental.sql

+103-6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,94 @@
2727
{% do return(strategy) %}
2828
{% endmacro %}
2929

30+
{% macro get_columns_with_types_in_query(select_sql) %}
31+
{% call statement('get_columns_with_types_in_query', fetch_result=True, auto_begin=False) -%}
32+
select * from (
33+
{{ select_sql }}
34+
) as __dbt_sbq
35+
where false
36+
limit 0
37+
{% endcall %}
38+
{%- set result = load_result('get_columns_with_types_in_query') -%}
39+
{{ return(load_result('get_columns_with_types_in_query').response.fields) }}
40+
{% endmacro %}
41+
42+
{% macro create_ingestion_time_partitioned_table_as(temporary, relation, sql) -%}
43+
{%- set raw_partition_by = config.get('partition_by', none) -%}
44+
{%- set raw_cluster_by = config.get('cluster_by', none) -%}
45+
{%- set sql_header = config.get('sql_header', none) -%}
46+
47+
{%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%}
48+
49+
{%- set columns = get_columns_with_types_in_query(sql) -%}
50+
{%- set table_dest_columns_csv = columns_without_partition_fields_csv(partition_config, columns) -%}
51+
52+
{{ sql_header if sql_header is not none }}
53+
54+
{% set ingestion_time_partition_config_raw = fromjson(tojson(raw_partition_by)) %}
55+
{% do ingestion_time_partition_config_raw.update({'field':'_PARTITIONTIME'}) %}
56+
57+
{%- set ingestion_time_partition_config = adapter.parse_partition_by(ingestion_time_partition_config_raw) -%}
58+
59+
create or replace table {{ relation }} ({{table_dest_columns_csv}})
60+
{{ partition_by(ingestion_time_partition_config) }}
61+
{{ cluster_by(raw_cluster_by) }}
62+
{{ bigquery_table_options(config, model, temporary) }}
63+
64+
{%- endmacro -%}
65+
66+
{% macro get_quoted_with_types_csv(columns) %}
67+
{% set quoted = [] %}
68+
{% for col in columns -%}
69+
{%- do quoted.append(adapter.quote(col.name) ~ " " ~ col.field_type) -%}
70+
{%- endfor %}
71+
{%- set dest_cols_csv = quoted | join(', ') -%}
72+
{{ return(dest_cols_csv) }}
73+
74+
{% endmacro %}
75+
76+
{% macro columns_without_partition_fields_csv(partition_config, columns) -%}
77+
{%- set columns_no_partition = partition_config.reject_partition_field_column(columns) -%}
78+
{% set columns_names = get_quoted_with_types_csv(columns_no_partition) %}
79+
{{ return(columns_names) }}
80+
81+
{%- endmacro -%}
82+
83+
{% macro bq_insert_into_ingestion_time_partitioned_table(target_relation, sql) -%}
84+
{%- set partition_by = config.get('partition_by', none) -%}
85+
{% set dest_columns = adapter.get_columns_in_relation(target_relation) %}
86+
{%- set dest_columns_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
87+
88+
insert into {{ target_relation }} (_partitiontime, {{ dest_columns_csv }})
89+
{{ wrap_with_time_ingestion_partitioning(build_partition_time_exp(partition_by), sql) }}
90+
91+
{%- endmacro -%}
92+
93+
{% macro build_partition_time_exp(partition_by) %}
94+
{% if partition_by.data_type == 'timestamp' %}
95+
{{ return(partition_by.field) }}
96+
{% else %}
97+
{{ return('timestamp(' + partition_by.field + ')') }}
98+
{% endif %}
99+
{% endmacro %}
100+
101+
{% macro wrap_with_time_ingestion_partitioning(partition_time_exp, sql) %}
102+
103+
select {{ partition_time_exp }} as _partitiontime, * EXCEPT({{ partition_time_exp }}) from (
104+
{{ sql }}
105+
);
106+
107+
{% endmacro %}
108+
109+
{% macro source_sql_with_partition(partition_by, source_sql) %}
110+
111+
{%- if partition_by.time_ingestion_partitioning %}
112+
{{ return(wrap_with_time_ingestion_partitioning(build_partition_time_exp(partition_by), source_sql)) }}
113+
{% else %}
114+
{{ return(source_sql) }}
115+
{%- endif -%}
116+
117+
{% endmacro %}
30118

31119
{% macro bq_insert_overwrite(
32120
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
@@ -60,15 +148,15 @@
60148
)
61149
{%- endset -%}
62150

63-
-- generated script to merge partitions into {{ target_relation }}
64151
declare dbt_partitions_for_replacement array<{{ partition_by.data_type }}>;
65152

66153
{# have we already created the temp table to check for schema changes? #}
67154
{% if not tmp_relation_exists %}
68155
{{ declare_dbt_max_partition(this, partition_by, sql) }}
69156

70157
-- 1. create a temp table
71-
{{ create_table_as(True, tmp_relation, sql) }}
158+
{% set create_table_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %}
159+
{{ create_table_sql }}
72160
{% else %}
73161
-- 1. temp table already exists, we used it to check for schema changes
74162
{% endif %}
@@ -94,6 +182,15 @@
94182

95183
{% endmacro %}
96184

185+
{% macro bq_create_table_as(is_time_ingestion_partitioning, temporary, relation, sql) %}
186+
{% if is_time_ingestion_partitioning %}
187+
{#-- Create the table before inserting data as ingestion time partitioned tables can't be created with the transformed data --#}
188+
{% do run_query(create_ingestion_time_partitioned_table_as(temporary, relation, sql)) %}
189+
{{ return(bq_insert_into_ingestion_time_partitioned_table(relation, sql)) }}
190+
{% else %}
191+
{{ return(create_table_as(temporary, relation, sql)) }}
192+
{% endif %}
193+
{% endmacro %}
97194

98195
{% macro bq_generate_incremental_build_sql(
99196
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
@@ -155,26 +252,26 @@
155252
{{ run_hooks(pre_hooks) }}
156253

157254
{% if existing_relation is none %}
158-
{% set build_sql = create_table_as(False, target_relation, sql) %}
255+
{% set build_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %}
159256

160257
{% elif existing_relation.is_view %}
161258
{#-- There's no way to atomically replace a view with a table on BQ --#}
162259
{{ adapter.drop_relation(existing_relation) }}
163-
{% set build_sql = create_table_as(False, target_relation, sql) %}
260+
{% set build_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %}
164261

165262
{% elif full_refresh_mode %}
166263
{#-- If the partition/cluster config has changed, then we must drop and recreate --#}
167264
{% if not adapter.is_replaceable(existing_relation, partition_by, cluster_by) %}
168265
{% do log("Hard refreshing " ~ existing_relation ~ " because it is not replaceable") %}
169266
{{ adapter.drop_relation(existing_relation) }}
170267
{% endif %}
171-
{% set build_sql = create_table_as(False, target_relation, sql) %}
268+
{% set build_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %}
172269

173270
{% else %}
174271
{% set tmp_relation_exists = false %}
175272
{% if on_schema_change != 'ignore' %} {# Check first, since otherwise we may not build a temp table #}
176273
{% do run_query(
177-
declare_dbt_max_partition(this, partition_by, sql) + create_table_as(True, tmp_relation, sql)
274+
declare_dbt_max_partition(this, partition_by, sql) + bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, sql)
178275
) %}
179276
{% set tmp_relation_exists = true %}
180277
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
{{
2+
config(
3+
materialized='incremental',
4+
unique_key='id',
5+
on_schema_change='append_new_columns',
6+
partition_by={
7+
"field": "date",
8+
"data_type": "timestamp",
9+
"time_ingestion_partitioning": true
10+
},
11+
)
12+
}}
13+
14+
{% set string_type = 'string' %}
15+
16+
WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )
17+
18+
{% if is_incremental() %}
19+
20+
SELECT id,
21+
cast(field1 as {{string_type}}) as field1,
22+
cast(field2 as {{string_type}}) as field2,
23+
cast(field3 as {{string_type}}) as field3,
24+
cast(field4 as {{string_type}}) as field4
25+
FROM source_data WHERE id NOT IN (SELECT id from {{ this }} )
26+
27+
{% else %}
28+
29+
SELECT id,
30+
cast(field1 as {{string_type}}) as field1,
31+
cast(field2 as {{string_type}}) as field2
32+
FROM source_data where id <= 3
33+
34+
{% endif %}

0 commit comments

Comments
 (0)