Skip to content

Commit 74c7c7d

Browse files
authored
Fix custom "+tag:some_tag" selector issue related to tests tag inheritance (#1466)
The selector method `_should_include_node` changes test tasks to inherit tags from their parent nodes. While this behaviour is acceptable and desirable in some cases, it can cause problems using graph selectors with tags. This PR improves the test coverage, narrows down the problem and fixes the problem reported by Astronomer customers. More details below. A user reported that they see the correct `DbtDag` when using Cosmos 1.8.1 with: * `LoadMode.DBT_LS` * `RenderConfig(selector="accounts_marts")` Where the selector `accounts_marts` is defined as: ``` - name: accounts_marts description: Run Accounts models definition: intersection: - '+tag:accounts' - '+tag:datamart' - '+tag:stratus' ``` The expected behaviour includes: - 164 Airflow tasks - 152 Local run tasks - 12 Snapshot tasks However, when they attempt to run the same `DbtDag` using: * `LoadMode.DBT_MANIFEST` * `RenderConfig(select=["+tag:accounts,+tag:datamart,+tag:stratus"])` Their `DbtDag` seems to have the wrong subset of nodes. They reported: - 197 Airflow tasks - 183 Local run tasks - 14 Snapshot tasks This pull request aims to reproduce and fix this issue.
1 parent de84174 commit 74c7c7d

File tree

4 files changed

+116
-10
lines changed

4 files changed

+116
-10
lines changed

CHANGELOG.rst

+8
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
Changelog
22
=========
33

4+
1.9.0a1 (2025-01-20)
5+
--------------------
6+
7+
Bug Fixes
8+
9+
* Fix select complex intersection of three tag-based graph selectors by @tatiana in #1466
10+
11+
412
1.8.2 (2025-01-15)
513
--------------------
614

cosmos/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
Contains dags, task groups, and operators.
77
"""
88

9-
__version__ = "1.8.2"
9+
__version__ = "1.9.0a1"
1010

1111

1212
from cosmos.airflow.dag import DbtDag

cosmos/dbt/selector.py

+18-9
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,6 @@ def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]:
166166
"""
167167
selected_nodes: set[str] = set()
168168
root_nodes: set[str] = set()
169-
170169
# Index nodes by name, we can improve performance by doing this once
171170
# for multiple GraphSelectors
172171
if PATH_SELECTOR in self.node_name:
@@ -367,25 +366,36 @@ def select_nodes_ids_by_intersection(self) -> set[str]:
367366
selected_nodes: set[str] = set()
368367
self.visited_nodes: set[str] = set()
369368

370-
for node_id, node in self.nodes.items():
371-
if self._should_include_node(node_id, node):
372-
selected_nodes.add(node_id)
373-
374369
if self.config.graph_selectors:
375-
nodes_by_graph_selector = self.select_by_graph_operator()
376-
selected_nodes = selected_nodes.intersection(nodes_by_graph_selector)
370+
graph_selected_nodes = self.select_by_graph_operator()
371+
for node_id in graph_selected_nodes:
372+
node = self.nodes[node_id]
373+
# Since the method below changes the tags of test nodes, it can lead to incorrect
374+
# results during the application of graph selectors. Therefore, it is being run within
375+
# nodes previously selected
376+
# This solves https://github.com/astronomer/astronomer-cosmos/pull/1466
377+
if self._should_include_node(node_id, node):
378+
selected_nodes.add(node_id)
379+
else:
380+
for node_id, node in self.nodes.items():
381+
if self._should_include_node(node_id, node):
382+
selected_nodes.add(node_id)
377383

378384
self.selected_nodes = selected_nodes
379385
return selected_nodes
380386

381387
def _should_include_node(self, node_id: str, node: DbtNode) -> bool:
382-
"""Checks if a single node should be included. Only runs once per node with caching."""
388+
"""
389+
Checks if a single node should be included. Only runs once per node with caching."""
383390
logger.debug("Inspecting if the node <%s> should be included.", node_id)
384391
if node_id in self.visited_nodes:
385392
return node_id in self.selected_nodes
386393

387394
self.visited_nodes.add(node_id)
388395

396+
# Disclaimer: this method currently copies the tags from parent nodes to children nodes
397+
# that are tests. This can lead to incorrect results in graph node selectors such as reported in
398+
# https://github.com/astronomer/astronomer-cosmos/pull/1466
389399
if node.resource_type == DbtResourceType.TEST and node.depends_on and len(node.depends_on) > 0:
390400
node.tags = getattr(self.nodes.get(node.depends_on[0]), "tags", [])
391401
logger.debug(
@@ -498,7 +508,6 @@ def select_nodes(
498508
exclude = exclude or []
499509
if not select and not exclude:
500510
return nodes
501-
502511
validate_filters(exclude, select)
503512
subset_ids = apply_select_filter(nodes, project_dir, select)
504513
if select:

tests/dbt/test_selector.py

+89
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ def test_is_empty_config(selector_config, paths, tags, config, other, expected):
6464
config={"materialized": "view", "tags": ["has_child", "is_child"]},
6565
)
6666

67+
6768
child_node = DbtNode(
6869
unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.child",
6970
resource_type=DbtResourceType.MODEL,
@@ -183,6 +184,94 @@ def test_select_nodes_by_select_intersection_config_tag():
183184
assert selected == expected
184185

185186

187+
def test_select_nodes_by_select_intersection_config_graph_selector_includes_ancestors():
188+
selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["+child,+sibling1"])
189+
expected = {
190+
grandparent_node.unique_id: grandparent_node,
191+
another_grandparent_node.unique_id: another_grandparent_node,
192+
parent_node.unique_id: parent_node,
193+
}
194+
assert selected == expected
195+
196+
197+
def test_select_nodes_by_select_intersection_config_graph_selector_none():
198+
selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["+child,+orphaned"])
199+
expected = {}
200+
assert selected == expected
201+
202+
203+
def test_select_nodes_by_intersection_and_tag_ancestry():
204+
parent_sibling_node = DbtNode(
205+
unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.parent_sibling",
206+
resource_type=DbtResourceType.MODEL,
207+
depends_on=[grandparent_node.unique_id, another_grandparent_node.unique_id],
208+
file_path=SAMPLE_PROJ_PATH / "gen2/models/parent_sibling.sql",
209+
tags=["is_adopted"],
210+
config={"materialized": "view", "tags": ["is_adopted"]},
211+
)
212+
sample_nodes_with_parent_sibling = dict(sample_nodes)
213+
sample_nodes_with_parent_sibling[parent_sibling_node.unique_id] = parent_sibling_node
214+
selected = select_nodes(
215+
project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes_with_parent_sibling, select=["+tag:is_child,+tag:is_adopted"]
216+
)
217+
expected = {
218+
grandparent_node.unique_id: grandparent_node,
219+
another_grandparent_node.unique_id: another_grandparent_node,
220+
}
221+
assert selected == expected
222+
223+
224+
def test_select_nodes_by_tag_ancestry():
225+
parent_sibling_node = DbtNode(
226+
unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.parent_sibling",
227+
resource_type=DbtResourceType.MODEL,
228+
depends_on=[grandparent_node.unique_id, another_grandparent_node.unique_id],
229+
file_path=SAMPLE_PROJ_PATH / "gen2/models/parent_sibling.sql",
230+
tags=["is_adopted"],
231+
config={"materialized": "view", "tags": ["is_adopted"]},
232+
)
233+
sample_nodes_with_parent_sibling = dict(sample_nodes)
234+
sample_nodes_with_parent_sibling[parent_sibling_node.unique_id] = parent_sibling_node
235+
selected = select_nodes(
236+
project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes_with_parent_sibling, select=["+tag:is_adopted"]
237+
)
238+
expected = {
239+
grandparent_node.unique_id: grandparent_node,
240+
another_grandparent_node.unique_id: another_grandparent_node,
241+
parent_sibling_node.unique_id: parent_sibling_node,
242+
}
243+
assert selected == expected
244+
245+
246+
def test_select_nodes_with_test_by_intersection_and_tag_ancestry():
247+
parent_sibling_node = DbtNode(
248+
unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.parent_sibling",
249+
resource_type=DbtResourceType.MODEL,
250+
depends_on=[grandparent_node.unique_id, another_grandparent_node.unique_id],
251+
file_path="",
252+
tags=["is_adopted"],
253+
config={"materialized": "view", "tags": ["is_adopted"]},
254+
)
255+
test_node = DbtNode(
256+
unique_id=f"{DbtResourceType.TEST.value}.{SAMPLE_PROJ_PATH.stem}.test",
257+
resource_type=DbtResourceType.TEST,
258+
depends_on=[parent_node.unique_id, parent_sibling_node.unique_id],
259+
file_path="",
260+
config={},
261+
)
262+
new_sample_nodes = dict(sample_nodes)
263+
new_sample_nodes[parent_sibling_node.unique_id] = parent_sibling_node
264+
new_sample_nodes[test_node.unique_id] = test_node
265+
selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=new_sample_nodes, select=["+tag:has_child"])
266+
# Expected must not include `parent_sibling_node` nor `test_node`
267+
expected = {
268+
parent_node.unique_id: parent_node,
269+
grandparent_node.unique_id: grandparent_node,
270+
another_grandparent_node.unique_id: another_grandparent_node,
271+
}
272+
assert selected == expected
273+
274+
186275
def test_select_nodes_by_select_path():
187276
selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["path:gen2/models"])
188277
expected = {

0 commit comments

Comments
 (0)