From 1d78b83a7aaa3c90c00533764a29c381160b06e9 Mon Sep 17 00:00:00 2001 From: fritz-astronomer <80706212+fritz-astronomer@users.noreply.github.com> Date: Thu, 29 Aug 2024 16:05:24 -0400 Subject: [PATCH 1/5] add `just docker-run-python`, add `orbiter translate --analyze`, add pyz from GH, add config file for various env vars, update template. --- .github/workflows/deploy.yml | 2 +- justfile | 25 +++++- orbiter/__init__.py | 15 +--- orbiter/__main__.py | 50 ++++++++--- orbiter/config.py | 14 ++++ orbiter/objects/project.py | 106 +++++++++++++++++++++++- orbiter/objects/task.py | 3 +- orbiter/objects/task_group.py | 2 +- pyproject.toml | 2 +- tests/resources/translation_template.py | 36 +++++--- 10 files changed, 213 insertions(+), 42 deletions(-) create mode 100644 orbiter/config.py diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 0792038..b410a67 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -23,7 +23,7 @@ jobs: with: python-version: ${{ matrix.python-version }} cache: 'pip' - - uses: extractions/setup-just@v1 + - uses: extractions/setup-just@v2 - run: just install - run: just build if: matrix.os == 'ubuntu-20.04' diff --git a/justfile b/justfile index 7f9f917..b98ffd5 100644 --- a/justfile +++ b/justfile @@ -99,10 +99,33 @@ docker-run-binary REPO='orbiter-community-translations' RULESET='orbiter_transla #!/usr/bin/env bash set -euxo pipefail cat <<"EOF" | docker run --platform linux/amd64 -v `pwd`:/data -w /data -i ubuntu /bin/bash - chmod +x ./orbiter-linux-x86_64 && \ + echo "setting up certificates for https" && \ + apt update && apt install -y ca-certificates && update-ca-certificates --fresh && \ + echo "sourcing .env" && \ set -a && source .env && set +a && \ + chmod +x ./orbiter-linux-x86_64 && \ + echo "[ORBITER LIST-RULESETS]" && \ ./orbiter-linux-x86_64 list-rulesets && \ mkdir -p workflow && \ + echo "[ORBITER INSTALL]" && \ LOG_LEVEL=DEBUG ./orbiter-linux-x86_64 install --repo={{REPO}} && \ + echo "[ORBITER TRANSLATE]" && \ LOG_LEVEL=DEBUG ./orbiter-linux-x86_64 translate workflow/ output/ --ruleset {{RULESET}} EOF + +docker-run-python REPO='orbiter-community-translations' RULESET='orbiter_translations.oozie.xml_demo.translation_ruleset': + #!/usr/bin/env bash + set -euxo pipefail + cat <<"EOF" | docker run --platform linux/amd64 -v `pwd`:/data -w /data -i python /bin/bash + echo "sourcing .env" && \ + set -a && source .env && set +a && \ + echo "installing orbiter" && \ + pip install '.' + echo "[ORBITER LIST-RULESETS]" && \ + orbiter list-rulesets && \ + mkdir -p workflow && \ + echo "[ORBITER INSTALL]" && \ + LOG_LEVEL=DEBUG orbiter install --repo={{REPO}} && \ + echo "[ORBITER TRANSLATE]" && \ + LOG_LEVEL=DEBUG orbiter translate workflow/ output/ --ruleset {{RULESET}} + EOF diff --git a/orbiter/__init__.py b/orbiter/__init__.py index a7b6e1a..7f2b534 100644 --- a/orbiter/__init__.py +++ b/orbiter/__init__.py @@ -1,25 +1,12 @@ from __future__ import annotations -import os import re -from enum import Enum from typing import Any, Tuple -__version__ = "1.0.2" +__version__ = "1.1.0" version = __version__ -KG_ACCOUNT_ID = "3b189b4c-c047-4fdb-9b46-408aa2978330" - -ORBITER_TASK_SUFFIX = os.getenv("ORBITER_TASK_SUFFIX", "_task") -"""By default, we add `_task` as a suffix to a task name to prevent name collision issues. This can be overridden.""" - - -class FileType(Enum): - YAML = "YAML" - XML = "XML" - JSON = "JSON" - def clean_value(s: str): """Cleans a string to be a standard value, such as one that might be a python variable name diff --git a/orbiter/__main__.py b/orbiter/__main__.py index 9a58399..35c5760 100644 --- a/orbiter/__main__.py +++ b/orbiter/__main__.py @@ -17,12 +17,17 @@ from csv import DictReader from rich.markdown import Markdown import pkgutil - -from orbiter import import_from_qualname, KG_ACCOUNT_ID +from urllib.request import urlretrieve + +from orbiter import import_from_qualname +from orbiter.config import ( + RUNNING_AS_BINARY, + KG_ACCOUNT_ID, + TRANSLATION_VERSION, + LOG_LEVEL, +) from orbiter.rules.rulesets import TranslationRuleset -RUNNING_AS_BINARY = getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS") - # ### LOGGING ### def formatter(r): @@ -38,7 +43,6 @@ def formatter(r): logger.remove() -LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO") sys.tracebacklimit = 1000 if LOG_LEVEL == "DEBUG" else 0 logger_defaults = dict(colorize=True, format=formatter) exceptions_off = {"backtrace": False, "diagnose": False} @@ -160,11 +164,18 @@ def orbiter(): default=True, show_default=True, ) +@click.option( + "--analyze/--no-analyze", + help="[optional] print statistics instead of rendering output", + default=False, + show_default=True, +) def translate( input_dir: Path, output_dir: Path, ruleset: str | None, _format: bool, + analyze: bool, ): """Translate workflows in an `INPUT_DIR` to an `OUTPUT_DIR` Airflow Project. @@ -195,9 +206,13 @@ def translate( ) try: - translation_ruleset.translate_fn( + project = translation_ruleset.translate_fn( translation_ruleset=translation_ruleset, input_dir=input_dir - ).render(output_dir) + ) + if analyze: + project.analyze() + else: + project.render(output_dir) except RuntimeError as e: logger.error(f"Error encountered during translation: {e}") raise click.Abort() @@ -207,7 +222,8 @@ def translate( def _pip_install(repo: str, key: str): """If we are running via python/pip, we can utilize pip to install translations""" - _exec = f"pip3 install {repo}" + _exec = f"{sys.executable} -m pip install {repo}" + _exec += f"=={TRANSLATION_VERSION}" if TRANSLATION_VERSION != "latest" else "" if repo == "astronomer-orbiter-translations": if not key: raise ValueError( @@ -247,6 +263,20 @@ def _get_keygen_pyz(key): f.write(r.content) +def _get_gh_pyz( + repo: str = "https://github.com/astronomer/orbiter-community-translations", + file: str = "orbiter_translations.pyz", +): + if TRANSLATION_VERSION != "latest": + url = f"{repo}/releases/download/{TRANSLATION_VERSION}/{file}" + else: + url = f"{repo}/releases/latest/download/{file}" + logger.info(f"Downloading {file} from {url}") + (downloaded_file, res) = urlretrieve(url, file) # nosec B310 + logger.debug(f"Downloaded {file} to {downloaded_file}, response: {res}") + return downloaded_file + + def _add_pyz(): local_pyz = [ str(_path.resolve()) for _path in Path(".").iterdir() if _path.suffix == ".pyz" @@ -257,14 +287,14 @@ def _add_pyz(): def _bin_install(repo: str, key: str): """If we are running via a PyInstaller binary, we need to download a .pyz""" - if repo == "astronomer-orbiter-translations": + if "astronomer-orbiter-translations" in repo: if not key: raise ValueError( "License key is required for 'astronomer-orbiter-translations'!" ) _get_keygen_pyz(key) else: - raise NotImplementedError() + _get_gh_pyz() _add_pyz() (_, _version) = import_from_qualname("orbiter_translations.version") logger.info(f"Successfully installed {repo}, version: {_version}") diff --git a/orbiter/config.py b/orbiter/config.py new file mode 100644 index 0000000..e018ff3 --- /dev/null +++ b/orbiter/config.py @@ -0,0 +1,14 @@ +import os +import sys + +TRANSLATION_VERSION = os.getenv("ORBITER_TRANSLATION_VERSION", "latest") +"""The version of the translation ruleset to download. This can be overridden.""" + +ORBITER_TASK_SUFFIX = os.getenv("ORBITER_TASK_SUFFIX", "_task") +"""By default, we add `_task` as a suffix to a task name to prevent name collision issues. This can be overridden.""" + +LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO") +"""You can set the log level to DEBUG to see more detailed logs.""" + +KG_ACCOUNT_ID = "3b189b4c-c047-4fdb-9b46-408aa2978330" +RUNNING_AS_BINARY = getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS") diff --git a/orbiter/objects/project.py b/orbiter/objects/project.py index 7328b0f..0a77d36 100644 --- a/orbiter/objects/project.py +++ b/orbiter/objects/project.py @@ -1,7 +1,8 @@ from __future__ import annotations +from functools import reduce from pathlib import Path -from typing import Dict, Iterable, Set +from typing import Dict, Iterable, Set, Literal import yaml from loguru import logger @@ -609,6 +610,99 @@ def render(self, output_dir: Path) -> None: else: logger.debug("No entries for .env") + @validate_call + def analyze(self, output_fmt: Literal["json", "csv", "md"] = "md"): + """Print an analysis of the project to the console. + + ```pycon + >>> from orbiter.objects.operators.empty import OrbiterEmptyOperator + >>> OrbiterProject().add_dags([ + ... OrbiterDAG(file_path="", dag_id="foo", orbiter_kwargs={"file_path": "foo.py"}, + ... tasks={"bar": OrbiterEmptyOperator(task_id="bar")} + ... ), + ... OrbiterDAG(file_path="", dag_id="baz", orbiter_kwargs={"file_path": "baz.py"}, + ... tasks={"bop": OrbiterEmptyOperator(task_id="bop")} + ... ) + ... ]).analyze() + ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ + ┃ Analysis ┃ + ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ + + + DAGs OrbiterEmptyOperator + ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + foo.py 1 1 + baz.py 1 1 + Totals 2 2 + + + ``` + """ + import sys + + dag_analysis = [ + { + "file": dag.orbiter_kwargs.get("file_path", dag.file_path), + "dag_id": dag.dag_id, + "task_types": [type(task).__name__ for task in dag.tasks.values()], + } + for dag in self.dags.values() + ] + + file_analysis = {} + for analysis in dag_analysis: + analysis_output = file_analysis.get(analysis["file"], {}) + analysis_output["DAGs"] = analysis_output.get("DAGs", 0) + 1 + tasks_of_type = reduce( + lambda acc, task_type: acc | {task_type: acc.get(task_type, 0) + 1}, + analysis["task_types"], + dict(), + ) + analysis_output |= tasks_of_type + file_analysis[analysis["file"]] = analysis_output + + file_analysis = [{"": k} | v for k, v in file_analysis.items()] + totals = {"": "Totals"} + for file in file_analysis: + for k, v in file.items(): + if k != "": + totals[k] = totals.get(k, 0) + v + file_analysis.append(totals) + + if output_fmt == "json": + import json + + json.dump(file_analysis, sys.stdout) + elif output_fmt == "csv": + import csv + import sys + + writer = csv.DictWriter(sys.stdout, fieldnames=file_analysis[0].keys()) + writer.writeheader() + writer.writerows(file_analysis) + elif output_fmt == "md": + from rich.console import Console + from rich.markdown import Markdown + from tabulate import tabulate + + console = Console() + + # DAGs EmptyOp + # file_a 1 1 + table = tabulate( + tabular_data=file_analysis, + headers="keys", + tablefmt="pipe", + # https://github.com/Textualize/rich/issues/3027 + missingval="⠀", # (special 'braille space' character) + ) + console.print( + Markdown( + f"# Analysis\n{table}", + style="magenta", + ) + ) + # https://github.com/pydantic/pydantic/issues/8790 OrbiterProject.add_connections = validate_call()(OrbiterProject.add_connections) @@ -618,3 +712,13 @@ def render(self, output_dir: Path) -> None: OrbiterProject.add_pools = validate_call()(OrbiterProject.add_pools) OrbiterProject.add_requirements = validate_call()(OrbiterProject.add_requirements) OrbiterProject.add_variables = validate_call()(OrbiterProject.add_variables) + + +if __name__ == "__main__": + import doctest + + doctest.testmod( + optionflags=doctest.ELLIPSIS + | doctest.NORMALIZE_WHITESPACE + | doctest.IGNORE_EXCEPTION_DETAIL + ) diff --git a/orbiter/objects/task.py b/orbiter/objects/task.py index f0206e7..f716366 100644 --- a/orbiter/objects/task.py +++ b/orbiter/objects/task.py @@ -7,13 +7,14 @@ from loguru import logger from pydantic import AfterValidator, BaseModel, validate_call -from orbiter import clean_value, ORBITER_TASK_SUFFIX +from orbiter import clean_value from orbiter.ast_helper import ( OrbiterASTBase, py_function, py_bitshift, ) from orbiter.ast_helper import py_assigned_object +from orbiter.config import ORBITER_TASK_SUFFIX from orbiter.objects import ImportList from orbiter.objects import OrbiterBase from orbiter.objects.pool import OrbiterPool diff --git a/orbiter/objects/task_group.py b/orbiter/objects/task_group.py index aa75820..08a35d0 100644 --- a/orbiter/objects/task_group.py +++ b/orbiter/objects/task_group.py @@ -6,7 +6,7 @@ from pydantic import field_validator -from orbiter import ORBITER_TASK_SUFFIX +from orbiter.config import ORBITER_TASK_SUFFIX from orbiter.ast_helper import ( OrbiterASTBase, py_with, diff --git a/pyproject.toml b/pyproject.toml index d4a85ad..ee95ab8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,7 +61,7 @@ dependencies = [ "pendulum", "tzdata", # for timezone data, if system doesn't have it? - # for 'help' command + # for 'list-rulesets' command, and '--analyze' "tabulate", # logging diff --git a/tests/resources/translation_template.py b/tests/resources/translation_template.py index 65769b6..d8e7498 100644 --- a/tests/resources/translation_template.py +++ b/tests/resources/translation_template.py @@ -28,9 +28,12 @@ @dag_filter_rule def basic_dag_filter(val: dict) -> list | None: """Filter input down to a list of dictionaries that can be processed by the `@dag_rules`""" - for k, v in val.items(): - pass - return [] + if ...: + for k, v in val.items(): + pass + return [] + else: + return None @dag_rule @@ -45,9 +48,12 @@ def basic_dag_rule(val: dict) -> OrbiterDAG | None: @task_filter_rule def basic_task_filter(val: dict) -> list | None: """Filter input down to a list of dictionaries that can be processed by the `@task_rules`""" - for k, v in val.items(): - pass - return [] + if ...: + for k, v in val.items(): + pass + return [] + else: + return None @task_rule(priority=2) @@ -62,17 +68,23 @@ def basic_task_rule(val: dict) -> OrbiterOperator | OrbiterTaskGroup | None: @task_dependency_rule def basic_task_dependency_rule(val: OrbiterDAG) -> list | None: """Translate input into a list of task dependencies""" - for task_dependency in val.orbiter_kwargs["task_dependencies"]: - pass - return [] + if ...: + for task in val.tasks.values(): + original_task_kwargs = task.orbiter_kwargs["val"] + for task_dependency in original_task_kwargs.get("task_dependencies", []): + pass + return [] + else: + return None @post_processing_rule def basic_post_processing_rule(val: OrbiterProject) -> None: """Modify the project in-place, after all other rules have applied""" - for dag_id, dag in val.dags.items(): - for task_id, task in dag.tasks.items(): - pass + if ...: + for dag_id, dag in val.dags.items(): + for task_id, task in dag.tasks.items(): + pass translation_ruleset = TranslationRuleset( From 8e06bc1dcd45cbb65714f275b4c2a7a5c97a0a19 Mon Sep 17 00:00:00 2001 From: fritz-astronomer <80706212+fritz-astronomer@users.noreply.github.com> Date: Thu, 29 Aug 2024 16:09:11 -0400 Subject: [PATCH 2/5] test fix re: formatting --- orbiter/objects/project.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/orbiter/objects/project.py b/orbiter/objects/project.py index 0a77d36..5052c5a 100644 --- a/orbiter/objects/project.py +++ b/orbiter/objects/project.py @@ -624,9 +624,10 @@ def analyze(self, output_fmt: Literal["json", "csv", "md"] = "md"): ... tasks={"bop": OrbiterEmptyOperator(task_id="bop")} ... ) ... ]).analyze() - ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ - ┃ Analysis ┃ - ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ + ... # doctest: +ELLIPSIS + ┏━... + ...Analysis... + ┗━... DAGs OrbiterEmptyOperator From f53e67a982244c354292aa39a1cce7775a0f801c Mon Sep 17 00:00:00 2001 From: fritz-astronomer <80706212+fritz-astronomer@users.noreply.github.com> Date: Thu, 29 Aug 2024 16:11:24 -0400 Subject: [PATCH 3/5] add back integration_test --- tests/integration_test.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/integration_test.py b/tests/integration_test.py index bd0634f..593d6fb 100644 --- a/tests/integration_test.py +++ b/tests/integration_test.py @@ -1,11 +1,7 @@ -import pytest - from orbiter.__main__ import run from tests.conftest import manual_tests -# noinspection PyUnreachableCode -@pytest.mark.skip("Relies on orbiter-community-translations, not yet published") @manual_tests def test_integration(): run("just docker-build-binary", shell=True, capture_output=True, text=True) From 45f427057363f274b45cee869ca6cdb82fd0f8b3 Mon Sep 17 00:00:00 2001 From: fritz-astronomer <80706212+fritz-astronomer@users.noreply.github.com> Date: Thu, 29 Aug 2024 16:34:47 -0400 Subject: [PATCH 4/5] add back FileType (temporarily) --- orbiter/__init__.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/orbiter/__init__.py b/orbiter/__init__.py index 7f2b534..e23af30 100644 --- a/orbiter/__init__.py +++ b/orbiter/__init__.py @@ -1,6 +1,7 @@ from __future__ import annotations import re +from enum import Enum from typing import Any, Tuple __version__ = "1.1.0" @@ -8,6 +9,12 @@ version = __version__ +class FileType(Enum): + YAML = "YAML" + XML = "XML" + JSON = "JSON" + + def clean_value(s: str): """Cleans a string to be a standard value, such as one that might be a python variable name From 188e7cc175efc4ddc6baa3f21d12ab51b3cb9f59 Mon Sep 17 00:00:00 2001 From: fritz-astronomer <80706212+fritz-astronomer@users.noreply.github.com> Date: Thu, 29 Aug 2024 17:43:34 -0400 Subject: [PATCH 5/5] add requests to fetch .pyz --- pyproject.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index ee95ab8..47c4872 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,9 @@ dependencies = [ # CLI prompt "questionary", + # fetch translation_rulesets in binary mode + "requests", + # For validation of objects "pydantic >= 2.6",