Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the async execution mode read sql files for dbt packages #1588

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
Contains dags, task groups, and operators.
"""

__version__ = "1.9.1a1"
__version__ = "1.9.1a2"


from cosmos.airflow.dag import DbtDag
Expand Down
1 change: 1 addition & 0 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ def create_task_metadata(
extra_context: dict[str, Any] = {
"dbt_node_config": node.context_dict,
"dbt_dag_task_group_identifier": dbt_dag_task_group_identifier,
"package_name": node.package_name,
}

if test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES:
Expand Down
9 changes: 8 additions & 1 deletion cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class DbtNode:
resource_type: DbtResourceType
depends_on: list[str]
file_path: Path
package_name: str | None = None
tags: list[str] = field(default_factory=lambda: [])
config: dict[str, Any] = field(default_factory=lambda: {})
has_freshness: bool = False
Expand Down Expand Up @@ -279,12 +280,17 @@ def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str,
except json.decoder.JSONDecodeError:
logger.debug("Skipped dbt ls line: %s", line)
else:
base_path = (
project_path.parent / node_dict["package_name"] if node_dict.get("package_name") else project_path # type: ignore
)

try:
node = DbtNode(
unique_id=node_dict["unique_id"],
package_name=node_dict.get("package_name"),
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
file_path=project_path / node_dict["original_file_path"],
file_path=base_path / node_dict["original_file_path"],
tags=node_dict.get("tags", []),
config=node_dict.get("config", {}),
has_freshness=(
Expand Down Expand Up @@ -821,6 +827,7 @@ def load_from_dbt_manifest(self) -> None:
for unique_id, node_dict in resources.items():
node = DbtNode(
unique_id=unique_id,
package_name=node_dict.get("package_name"),
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
file_path=self.execution_config.project_path / Path(node_dict["original_file_path"]),
Expand Down
5 changes: 3 additions & 2 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,8 +406,9 @@ def _cache_package_lockfile(self, tmp_project_dir: Path) -> None:
_copy_cached_package_lockfile_to_project(latest_package_lockfile, tmp_project_dir)

def _read_run_sql_from_target_dir(self, tmp_project_dir: str, sql_context: dict[str, Any]) -> str:
sql_relative_path = sql_context["dbt_node_config"]["file_path"].split(str(self.project_dir))[-1].lstrip("/")
run_sql_path = Path(tmp_project_dir) / "target/run" / Path(self.project_dir).name / sql_relative_path
package_name = sql_context.get("package_name") or Path(self.project_dir).name
sql_relative_path = sql_context["dbt_node_config"]["file_path"].split(package_name)[-1].lstrip("/")
run_sql_path = Path(tmp_project_dir) / "target/run" / Path(package_name).name / sql_relative_path
with run_sql_path.open("r") as sql_file:
sql_content: str = sql_file.read()
return sql_content
Expand Down
2 changes: 2 additions & 0 deletions tests/airflow/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ def test_create_task_metadata_unsupported(caplog):
"resource_name": "my_model",
"name": "my_model",
},
"package_name": None,
},
),
(
Expand Down Expand Up @@ -476,6 +477,7 @@ def test_create_task_metadata_unsupported(caplog):
"resource_name": "my_snapshot",
"name": "my_snapshot",
},
"package_name": None,
},
),
],
Expand Down
9 changes: 6 additions & 3 deletions tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1349,7 +1349,7 @@ def test_parse_dbt_ls_output_real_life_customer_bug(caplog):
"model.some_package.some_model": DbtNode(
unique_id="model.some_package.some_model",
resource_type=DbtResourceType.MODEL,
file_path=Path("fake-project/models/some_model.sql"),
file_path=Path("some_package/models/some_model.sql"),
tags=[],
config={
"access": "protected",
Expand Down Expand Up @@ -1383,6 +1383,7 @@ def test_parse_dbt_ls_output_real_life_customer_bug(caplog):
"unique_key": None,
},
depends_on=["source.some_source"],
package_name="some_package",
),
}
nodes = parse_dbt_ls_output(Path("fake-project"), dbt_ls_output)
Expand All @@ -1392,7 +1393,7 @@ def test_parse_dbt_ls_output_real_life_customer_bug(caplog):


def test_parse_dbt_ls_output():
fake_ls_stdout = '{"resource_type": "model", "name": "fake-name", "original_file_path": "fake-file-path.sql", "unique_id": "fake-unique-id", "tags": [], "config": {}}'
fake_ls_stdout = '{"resource_type": "model", "name": "fake-name", "package_name": "fake-project", "original_file_path": "fake-file-path.sql", "unique_id": "fake-unique-id", "tags": [], "config": {}}'

expected_nodes = {
"fake-unique-id": DbtNode(
Expand All @@ -1402,6 +1403,7 @@ def test_parse_dbt_ls_output():
tags=[],
config={},
depends_on=[],
package_name="fake-project",
),
}
nodes = parse_dbt_ls_output(Path("fake-project"), fake_ls_stdout)
Expand All @@ -1410,7 +1412,7 @@ def test_parse_dbt_ls_output():


def test_parse_dbt_ls_output_with_json_without_tags_or_config():
some_ls_stdout = '{"resource_type": "model", "name": "some-name", "original_file_path": "some-file-path.sql", "unique_id": "some-unique-id", "config": {}}'
some_ls_stdout = '{"resource_type": "model", "name": "some-name", "package_name": "some-project", "original_file_path": "some-file-path.sql", "unique_id": "some-unique-id", "config": {}}'

expected_nodes = {
"some-unique-id": DbtNode(
Expand All @@ -1420,6 +1422,7 @@ def test_parse_dbt_ls_output_with_json_without_tags_or_config():
tags=[],
config={},
depends_on=[],
package_name="some-project",
),
}
nodes = parse_dbt_ls_output(Path("some-project"), some_ls_stdout)
Expand Down
18 changes: 17 additions & 1 deletion tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, call, patch
from unittest.mock import MagicMock, call, mock_open, patch

import pytest
from airflow import DAG
Expand Down Expand Up @@ -1474,3 +1474,19 @@ def test_async_execution_teardown_delete_files(mock_unlink, mock_construct_dest_
)
operator._handle_async_execution(project_dir, {}, {"profile_type": "bigquery", "teardown_task": True})
mock_unlink.assert_called()


def test_read_run_sql_from_target_dir():
tmp_project_dir = "/tmp/project"
sql_context = {"dbt_node_config": {"file_path": "/path/to/file.sql"}, "package_name": "package_name"}

operator = DbtRunLocalOperator(
task_id="test",
project_dir="/tmp",
profile_config=profile_config,
)

expected_sql_content = "SELECT * FROM my_table;"
with patch("pathlib.Path.open", new_callable=mock_open, read_data=expected_sql_content):
result = operator._read_run_sql_from_target_dir(tmp_project_dir, sql_context)
assert result == expected_sql_content