Skip to content

Commit

Permalink
Run async test without setup task
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajastro committed Mar 11, 2025
1 parent dd8b6c7 commit ea0cb6f
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: test

on:
push: # Run on pushes to the default branch
branches: [main]
branches: [main, test_without_setup_task]
pull_request_target: # Also run on pull requests originated from forks
branches: [main]

Expand Down
1 change: 1 addition & 0 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ def _cache_package_lockfile(self, tmp_project_dir: Path) -> None:
_copy_cached_package_lockfile_to_project(latest_package_lockfile, tmp_project_dir)

def _read_run_sql_from_target_dir(self, tmp_project_dir: str, sql_context: dict[str, Any]) -> str:
logger.info("Testing Log.....")
sql_relative_path = sql_context["dbt_node_config"]["file_path"].split(str(self.project_dir))[-1].lstrip("/")
run_sql_path = Path(tmp_project_dir) / "target/run" / Path(self.project_dir).name / sql_relative_path
with run_sql_path.open("r") as sql_file:
Expand Down
34 changes: 34 additions & 0 deletions tests/test_example_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,37 @@ def test_example_dag(session, dag_id: str):
)
else:
test_utils.run_dag(dag)


async_dag_ids = ["simple_dag_async"]


@pytest.mark.integration
def test_async_example_dag_without_setup_task(session, monkeypatch):
monkeypatch.setattr("cosmos.setting.enable_setup_async_task", False)
monkeypatch.setattr("cosmos.setting.enable_setup_async_task", False)
dag_bag = get_dag_bag()
for dag_id in async_dag_ids:
dag = dag_bag.get_dag(dag_id)
if AIRFLOW_VERSION >= Version("2.5"):
if AIRFLOW_VERSION not in (Version("2.10.0"), Version("2.10.1"), Version("2.10.2")):
dag.test()
else:
# This is a work around until we fix the issue in Airflow:
# https://github.com/apache/airflow/issues/42495
"""
FAILED tests/test_example_dags.py::test_example_dag[example_model_version] - sqlalchemy.exc.PendingRollbackError:
This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback().
Original exception was: Can't flush None value found in collection DatasetModel.aliases (Background on this error at: https://sqlalche.me/e/14/7s2a)
FAILED tests/test_example_dags.py::test_example_dag[basic_cosmos_dag]
FAILED tests/test_example_dags.py::test_example_dag[cosmos_profile_mapping]
FAILED tests/test_example_dags.py::test_example_dag[user_defined_profile]
"""
try:
dag.test()
except sqlalchemy.exc.PendingRollbackError:
warnings.warn(
"Early versions of Airflow 2.10 have issues when running the test command with DatasetAlias / Datasets"
)
else:
test_utils.run_dag(dag)

0 comments on commit ea0cb6f

Please sign in to comment.