Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.1.0 #12

Merged
merged 5 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
with:
python-version: ${{ matrix.python-version }}
cache: 'pip'
- uses: extractions/setup-just@v1
- uses: extractions/setup-just@v2
- run: just install
- run: just build
if: matrix.os == 'ubuntu-20.04'
Expand Down
25 changes: 24 additions & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,33 @@ docker-run-binary REPO='orbiter-community-translations' RULESET='orbiter_transla
#!/usr/bin/env bash
set -euxo pipefail
cat <<"EOF" | docker run --platform linux/amd64 -v `pwd`:/data -w /data -i ubuntu /bin/bash
chmod +x ./orbiter-linux-x86_64 && \
echo "setting up certificates for https" && \
apt update && apt install -y ca-certificates && update-ca-certificates --fresh && \
echo "sourcing .env" && \
set -a && source .env && set +a && \
chmod +x ./orbiter-linux-x86_64 && \
echo "[ORBITER LIST-RULESETS]" && \
./orbiter-linux-x86_64 list-rulesets && \
mkdir -p workflow && \
echo "[ORBITER INSTALL]" && \
LOG_LEVEL=DEBUG ./orbiter-linux-x86_64 install --repo={{REPO}} && \
echo "[ORBITER TRANSLATE]" && \
LOG_LEVEL=DEBUG ./orbiter-linux-x86_64 translate workflow/ output/ --ruleset {{RULESET}}
EOF

docker-run-python REPO='orbiter-community-translations' RULESET='orbiter_translations.oozie.xml_demo.translation_ruleset':
#!/usr/bin/env bash
set -euxo pipefail
cat <<"EOF" | docker run --platform linux/amd64 -v `pwd`:/data -w /data -i python /bin/bash
echo "sourcing .env" && \
set -a && source .env && set +a && \
echo "installing orbiter" && \
pip install '.'
echo "[ORBITER LIST-RULESETS]" && \
orbiter list-rulesets && \
mkdir -p workflow && \
echo "[ORBITER INSTALL]" && \
LOG_LEVEL=DEBUG orbiter install --repo={{REPO}} && \
echo "[ORBITER TRANSLATE]" && \
LOG_LEVEL=DEBUG orbiter translate workflow/ output/ --ruleset {{RULESET}}
EOF
8 changes: 1 addition & 7 deletions orbiter/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
from __future__ import annotations

import os
import re
from enum import Enum
from typing import Any, Tuple

__version__ = "1.0.2"
__version__ = "1.1.0"

version = __version__

KG_ACCOUNT_ID = "3b189b4c-c047-4fdb-9b46-408aa2978330"

ORBITER_TASK_SUFFIX = os.getenv("ORBITER_TASK_SUFFIX", "_task")
"""By default, we add `_task` as a suffix to a task name to prevent name collision issues. This can be overridden."""


class FileType(Enum):
YAML = "YAML"
Expand Down
50 changes: 40 additions & 10 deletions orbiter/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@
from csv import DictReader
from rich.markdown import Markdown
import pkgutil

from orbiter import import_from_qualname, KG_ACCOUNT_ID
from urllib.request import urlretrieve

from orbiter import import_from_qualname
from orbiter.config import (
RUNNING_AS_BINARY,
KG_ACCOUNT_ID,
TRANSLATION_VERSION,
LOG_LEVEL,
)
from orbiter.rules.rulesets import TranslationRuleset

RUNNING_AS_BINARY = getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS")


# ### LOGGING ###
def formatter(r):
Expand All @@ -38,7 +43,6 @@ def formatter(r):


logger.remove()
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
sys.tracebacklimit = 1000 if LOG_LEVEL == "DEBUG" else 0
logger_defaults = dict(colorize=True, format=formatter)
exceptions_off = {"backtrace": False, "diagnose": False}
Expand Down Expand Up @@ -160,11 +164,18 @@ def orbiter():
default=True,
show_default=True,
)
@click.option(
"--analyze/--no-analyze",
help="[optional] print statistics instead of rendering output",
default=False,
show_default=True,
)
def translate(
input_dir: Path,
output_dir: Path,
ruleset: str | None,
_format: bool,
analyze: bool,
):
"""Translate workflows in an `INPUT_DIR` to an `OUTPUT_DIR` Airflow Project.

Expand Down Expand Up @@ -195,9 +206,13 @@ def translate(
)

try:
translation_ruleset.translate_fn(
project = translation_ruleset.translate_fn(
translation_ruleset=translation_ruleset, input_dir=input_dir
).render(output_dir)
)
if analyze:
project.analyze()
else:
project.render(output_dir)
except RuntimeError as e:
logger.error(f"Error encountered during translation: {e}")
raise click.Abort()
Expand All @@ -207,7 +222,8 @@ def translate(

def _pip_install(repo: str, key: str):
"""If we are running via python/pip, we can utilize pip to install translations"""
_exec = f"pip3 install {repo}"
_exec = f"{sys.executable} -m pip install {repo}"
_exec += f"=={TRANSLATION_VERSION}" if TRANSLATION_VERSION != "latest" else ""
if repo == "astronomer-orbiter-translations":
if not key:
raise ValueError(
Expand Down Expand Up @@ -247,6 +263,20 @@ def _get_keygen_pyz(key):
f.write(r.content)


def _get_gh_pyz(
repo: str = "https://github.com/astronomer/orbiter-community-translations",
file: str = "orbiter_translations.pyz",
):
if TRANSLATION_VERSION != "latest":
url = f"{repo}/releases/download/{TRANSLATION_VERSION}/{file}"
else:
url = f"{repo}/releases/latest/download/{file}"
logger.info(f"Downloading {file} from {url}")
(downloaded_file, res) = urlretrieve(url, file) # nosec B310
logger.debug(f"Downloaded {file} to {downloaded_file}, response: {res}")
return downloaded_file


def _add_pyz():
local_pyz = [
str(_path.resolve()) for _path in Path(".").iterdir() if _path.suffix == ".pyz"
Expand All @@ -257,14 +287,14 @@ def _add_pyz():

def _bin_install(repo: str, key: str):
"""If we are running via a PyInstaller binary, we need to download a .pyz"""
if repo == "astronomer-orbiter-translations":
if "astronomer-orbiter-translations" in repo:
if not key:
raise ValueError(
"License key is required for 'astronomer-orbiter-translations'!"
)
_get_keygen_pyz(key)
else:
raise NotImplementedError()
_get_gh_pyz()
_add_pyz()
(_, _version) = import_from_qualname("orbiter_translations.version")
logger.info(f"Successfully installed {repo}, version: {_version}")
Expand Down
14 changes: 14 additions & 0 deletions orbiter/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import os
import sys

TRANSLATION_VERSION = os.getenv("ORBITER_TRANSLATION_VERSION", "latest")
"""The version of the translation ruleset to download. This can be overridden."""

ORBITER_TASK_SUFFIX = os.getenv("ORBITER_TASK_SUFFIX", "_task")
"""By default, we add `_task` as a suffix to a task name to prevent name collision issues. This can be overridden."""

LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
"""You can set the log level to DEBUG to see more detailed logs."""

KG_ACCOUNT_ID = "3b189b4c-c047-4fdb-9b46-408aa2978330"
RUNNING_AS_BINARY = getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS")
107 changes: 106 additions & 1 deletion orbiter/objects/project.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from __future__ import annotations

from functools import reduce
from pathlib import Path
from typing import Dict, Iterable, Set
from typing import Dict, Iterable, Set, Literal

import yaml
from loguru import logger
Expand Down Expand Up @@ -609,6 +610,100 @@ def render(self, output_dir: Path) -> None:
else:
logger.debug("No entries for .env")

@validate_call
def analyze(self, output_fmt: Literal["json", "csv", "md"] = "md"):
"""Print an analysis of the project to the console.

```pycon
>>> from orbiter.objects.operators.empty import OrbiterEmptyOperator
>>> OrbiterProject().add_dags([
... OrbiterDAG(file_path="", dag_id="foo", orbiter_kwargs={"file_path": "foo.py"},
... tasks={"bar": OrbiterEmptyOperator(task_id="bar")}
... ),
... OrbiterDAG(file_path="", dag_id="baz", orbiter_kwargs={"file_path": "baz.py"},
... tasks={"bop": OrbiterEmptyOperator(task_id="bop")}
... )
... ]).analyze()
... # doctest: +ELLIPSIS
┏━...
...Analysis...
┗━...
<BLANKLINE>
<BLANKLINE>
DAGs OrbiterEmptyOperator
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
foo.py 1 1
baz.py 1 1
Totals 2 2
<BLANKLINE>

```
"""
import sys

dag_analysis = [
{
"file": dag.orbiter_kwargs.get("file_path", dag.file_path),
"dag_id": dag.dag_id,
"task_types": [type(task).__name__ for task in dag.tasks.values()],
}
for dag in self.dags.values()
]

file_analysis = {}
for analysis in dag_analysis:
analysis_output = file_analysis.get(analysis["file"], {})
analysis_output["DAGs"] = analysis_output.get("DAGs", 0) + 1
tasks_of_type = reduce(
lambda acc, task_type: acc | {task_type: acc.get(task_type, 0) + 1},
analysis["task_types"],
dict(),
)
analysis_output |= tasks_of_type
file_analysis[analysis["file"]] = analysis_output

file_analysis = [{"": k} | v for k, v in file_analysis.items()]
totals = {"": "Totals"}
for file in file_analysis:
for k, v in file.items():
if k != "":
totals[k] = totals.get(k, 0) + v
file_analysis.append(totals)

if output_fmt == "json":
import json

json.dump(file_analysis, sys.stdout)
elif output_fmt == "csv":
import csv
import sys

writer = csv.DictWriter(sys.stdout, fieldnames=file_analysis[0].keys())
writer.writeheader()
writer.writerows(file_analysis)
elif output_fmt == "md":
from rich.console import Console
from rich.markdown import Markdown
from tabulate import tabulate

console = Console()

# DAGs EmptyOp
# file_a 1 1
table = tabulate(
tabular_data=file_analysis,
headers="keys",
tablefmt="pipe",
# https://github.com/Textualize/rich/issues/3027
missingval="⠀", # (special 'braille space' character)
)
console.print(
Markdown(
f"# Analysis\n{table}",
style="magenta",
)
)


# https://github.com/pydantic/pydantic/issues/8790
OrbiterProject.add_connections = validate_call()(OrbiterProject.add_connections)
Expand All @@ -618,3 +713,13 @@ def render(self, output_dir: Path) -> None:
OrbiterProject.add_pools = validate_call()(OrbiterProject.add_pools)
OrbiterProject.add_requirements = validate_call()(OrbiterProject.add_requirements)
OrbiterProject.add_variables = validate_call()(OrbiterProject.add_variables)


if __name__ == "__main__":
import doctest

doctest.testmod(
optionflags=doctest.ELLIPSIS
| doctest.NORMALIZE_WHITESPACE
| doctest.IGNORE_EXCEPTION_DETAIL
)
3 changes: 2 additions & 1 deletion orbiter/objects/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
from loguru import logger
from pydantic import AfterValidator, BaseModel, validate_call

from orbiter import clean_value, ORBITER_TASK_SUFFIX
from orbiter import clean_value
from orbiter.ast_helper import (
OrbiterASTBase,
py_function,
py_bitshift,
)
from orbiter.ast_helper import py_assigned_object
from orbiter.config import ORBITER_TASK_SUFFIX
from orbiter.objects import ImportList
from orbiter.objects import OrbiterBase
from orbiter.objects.pool import OrbiterPool
Expand Down
2 changes: 1 addition & 1 deletion orbiter/objects/task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from pydantic import field_validator

from orbiter import ORBITER_TASK_SUFFIX
from orbiter.config import ORBITER_TASK_SUFFIX
from orbiter.ast_helper import (
OrbiterASTBase,
py_with,
Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ dependencies = [
# CLI prompt
"questionary",

# fetch translation_rulesets in binary mode
"requests",

# For validation of objects
"pydantic >= 2.6",

Expand All @@ -61,7 +64,7 @@ dependencies = [
"pendulum",
"tzdata", # for timezone data, if system doesn't have it?

# for 'help' command
# for 'list-rulesets' command, and '--analyze'
"tabulate",

# logging
Expand Down
4 changes: 0 additions & 4 deletions tests/integration_test.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
import pytest

from orbiter.__main__ import run
from tests.conftest import manual_tests


# noinspection PyUnreachableCode
@pytest.mark.skip("Relies on orbiter-community-translations, not yet published")
@manual_tests
def test_integration():
run("just docker-build-binary", shell=True, capture_output=True, text=True)
Expand Down
Loading
Loading