diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index 92e6201f99906..15390bec5c9ff 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -1267,6 +1267,24 @@ class GroupCommand(NamedTuple): func=lazy_load_command('airflow.cli.commands.provider_command.connection_field_behaviours'), args=(ARG_OUTPUT, ARG_VERBOSE), ), + ActionCommand( + name='logging', + help='Get information about task logging handlers provided', + func=lazy_load_command('airflow.cli.commands.provider_command.logging_list'), + args=(ARG_OUTPUT, ARG_VERBOSE), + ), + ActionCommand( + name='secrets', + help='Get information about secrets backends provided', + func=lazy_load_command('airflow.cli.commands.provider_command.secrets_backends_list'), + args=(ARG_OUTPUT, ARG_VERBOSE), + ), + ActionCommand( + name='auth', + help='Get information about API auth backends provided', + func=lazy_load_command('airflow.cli.commands.provider_command.auth_backend_list'), + args=(ARG_OUTPUT, ARG_VERBOSE), + ), ) USERS_COMMANDS = ( diff --git a/airflow/cli/commands/provider_command.py b/airflow/cli/commands/provider_command.py index 65f3a489e6ec0..75a9063f908d1 100644 --- a/airflow/cli/commands/provider_command.py +++ b/airflow/cli/commands/provider_command.py @@ -113,3 +113,39 @@ def extra_links_list(args): "extra_link_class_name": x, }, ) + + +@suppress_logs_and_warning +def logging_list(args): + """Lists all log task handlers at the command line""" + AirflowConsole().print_as( + data=list(ProvidersManager().logging_class_names), + output=args.output, + mapper=lambda x: { + "logging_class_name": x, + }, + ) + + +@suppress_logs_and_warning +def secrets_backends_list(args): + """Lists all secrets backends at the command line""" + AirflowConsole().print_as( + data=list(ProvidersManager().secrets_backend_class_names), + output=args.output, + mapper=lambda x: { + "secrets_backend_class_name": x, + }, + ) + + +@suppress_logs_and_warning +def auth_backend_list(args): + """Lists all API auth backend modules at the command line""" + AirflowConsole().print_as( + data=list(ProvidersManager().auth_backend_module_names), + output=args.output, + mapper=lambda x: { + "api_auth_backand_module": x, + }, + ) diff --git a/airflow/provider.yaml.schema.json b/airflow/provider.yaml.schema.json index 840c902201af9..293d2b6685c8d 100644 --- a/airflow/provider.yaml.schema.json +++ b/airflow/provider.yaml.schema.json @@ -210,6 +210,27 @@ "additional-extras": { "type": "object", "description": "Additional extras that the provider should have" + }, + "secrets-backends": { + "type": "array", + "description": "Secrets Backend class names", + "items": { + "type": "string" + } + }, + "logging": { + "type": "array", + "description": "Logging Task Handlers class names", + "items": { + "type": "string" + } + }, + "auth-backends": { + "type": "array", + "description": "API Auth Backend module names", + "items": { + "type": "string" + } } }, "additionalProperties": false, diff --git a/airflow/provider_info.schema.json b/airflow/provider_info.schema.json index 656adbd0db16d..9c24b2df91794 100644 --- a/airflow/provider_info.schema.json +++ b/airflow/provider_info.schema.json @@ -27,6 +27,27 @@ "items": { "type": "string" } + }, + "secrets-backends": { + "type": "array", + "description": "Secrets Backend class names", + "items": { + "type": "string" + } + }, + "logging": { + "type": "array", + "description": "Logging Task Handlers class names", + "items": { + "type": "string" + } + }, + "auth-backends": { + "type": "array", + "description": "API Auth Backend module names", + "items": { + "type": "string" + } } }, "required": [ diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index dc0a8441beab0..4c4436da7b4eb 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -399,3 +399,11 @@ hook-class-names: - airflow.providers.amazon.aws.hooks.s3.S3Hook - airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook - airflow.providers.amazon.aws.hooks.emr.EmrHook + +secrets-backends: + - airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend + - airflow.providers.amazon.aws.secrets.systems_manager.SystemsManagerParameterStoreBackend + +logging: + - airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler + - airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler diff --git a/airflow/providers/elasticsearch/provider.yaml b/airflow/providers/elasticsearch/provider.yaml index 781e19697b110..23ea3d9b261e2 100644 --- a/airflow/providers/elasticsearch/provider.yaml +++ b/airflow/providers/elasticsearch/provider.yaml @@ -46,3 +46,6 @@ hooks: hook-class-names: - airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchHook + +logging: + - airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml index 92b67e8157574..f117df3f939ab 100644 --- a/airflow/providers/google/provider.yaml +++ b/airflow/providers/google/provider.yaml @@ -757,3 +757,13 @@ extra-links: additional-extras: apache.beam: apache-beam[gcp] leveldb: plyvel + +secrets-backends: + - airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend + +auth-backends: + - airflow.providers.google.common.auth_backend.google_openid + +logging: + - airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler + - airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler diff --git a/airflow/providers/hashicorp/provider.yaml b/airflow/providers/hashicorp/provider.yaml index ce6625659c85f..eee1e9d995395 100644 --- a/airflow/providers/hashicorp/provider.yaml +++ b/airflow/providers/hashicorp/provider.yaml @@ -43,3 +43,6 @@ hooks: hook-class-names: - airflow.providers.hashicorp.hooks.vault.VaultHook + +secrets-backends: + - airflow.providers.hashicorp.secrets.vault.VaultBackend diff --git a/airflow/providers/microsoft/azure/provider.yaml b/airflow/providers/microsoft/azure/provider.yaml index c12bb713a63f6..3dfe3253f3cf2 100644 --- a/airflow/providers/microsoft/azure/provider.yaml +++ b/airflow/providers/microsoft/azure/provider.yaml @@ -165,3 +165,9 @@ hook-class-names: - airflow.providers.microsoft.azure.hooks.wasb.WasbHook - airflow.providers.microsoft.azure.hooks.azure_data_factory.AzureDataFactoryHook - airflow.providers.microsoft.azure.hooks.azure_container_registry.AzureContainerRegistryHook + +secrets-backends: + - airflow.providers.microsoft.azure.secrets.azure_key_vault.AzureKeyVaultBackend + +logging: + - airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler diff --git a/airflow/providers_manager.py b/airflow/providers_manager.py index 17bcf15ce583c..6bea28e9c5cba 100644 --- a/airflow/providers_manager.py +++ b/airflow/providers_manager.py @@ -22,8 +22,9 @@ import logging import os from collections import OrderedDict +from functools import wraps from time import perf_counter -from typing import Any, Dict, NamedTuple, Set +from typing import Any, Callable, Dict, List, NamedTuple, Set, TypeVar, cast import jsonschema from wtforms import BooleanField, Field, IntegerField, PasswordField, StringField @@ -31,6 +32,7 @@ from airflow.utils import yaml from airflow.utils.entry_points import entry_points_with_dist from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.module_loading import import_string try: import importlib.resources as importlib_resources @@ -61,6 +63,35 @@ def _create_customized_form_field_behaviours_schema_validator(): return validator +def _sanity_check(provider_package: str, class_name: str) -> bool: + """ + Performs sanity check on provider classes. + For apache-airflow providers - it checks if it starts with appropriate package. For all providers + it tries to import the provider - checking that there are no exceptions during importing. + """ + if provider_package.startswith("apache-airflow"): + provider_path = provider_package[len("apache-") :].replace("-", ".") + if not class_name.startswith(provider_path): + log.warning( + "Sanity check failed when importing '%s' from '%s' package. It should start with '%s'", + class_name, + provider_package, + provider_path, + ) + return False + try: + import_string(class_name) + except Exception as e: + log.warning( + "Exception when importing '%s' from '%s' package: %s", + class_name, + provider_package, + e, + ) + return False + return True + + class ProviderInfo(NamedTuple): """Provider information""" @@ -85,6 +116,40 @@ class ConnectionFormWidgetInfo(NamedTuple): field: Field +T = TypeVar("T", bound=Callable) + +logger = logging.getLogger(__name__) + + +# We want to have better control over initialization of parameters and be able to debug and test it +# So we add our own decorator +def provider_info_cache(cache_name: str) -> Callable[[T], T]: + """ + Decorator factory that create decorator that caches initialization of provider's parameters + :param cache_name: Name of the cache + """ + + def provider_info_cache_decorator(func: T): + @wraps(func) + def wrapped_function(*args, **kwargs): + providers_manager_instance = args[0] + if cache_name in providers_manager_instance._initialized_cache: + return + start_time = perf_counter() + logger.debug("Initializing Providers Manager[%s]", cache_name) + func(*args, **kwargs) + providers_manager_instance._initialized_cache[cache_name] = True + logger.debug( + "Initialization of Providers Manager[%s] took %.2f seconds", + cache_name, + perf_counter() - start_time, + ) + + return cast(T, wrapped_function) + + return provider_info_cache_decorator + + class ProvidersManager(LoggingMixin): """ Manages all provider packages. This is a Singleton class. The first time it is @@ -102,6 +167,7 @@ def __new__(cls): def __init__(self): """Initializes the manager.""" + self._initialized_cache: Dict[str, bool] = {} # Keeps dict of providers keyed by module name self._provider_dict: Dict[str, ProviderInfo] = {} # Keeps dict of hooks keyed by connection type @@ -111,25 +177,17 @@ def __init__(self): # Customizations for javascript fields are kept here self._field_behaviours: Dict[str, Dict] = {} self._extra_link_class_name_set: Set[str] = set() + self._logging_class_name_set: Set[str] = set() + self._secrets_backend_class_name_set: Set[str] = set() + self._api_auth_backend_module_names: Set[str] = set() self._provider_schema_validator = _create_provider_info_schema_validator() self._customized_form_fields_schema_validator = ( _create_customized_form_field_behaviours_schema_validator() ) - self._providers_list_initialized = False - self._providers_hooks_initialized = False - self._providers_extra_links_initialized = False + @provider_info_cache("list") def initialize_providers_list(self): """Lazy initialization of providers list.""" - # We cannot use @cache here because it does not work during pytest, apparently each test - # runs it it's own namespace and ProvidersManager is a different object in each namespace - # even if it is singleton but @cache on the initialize_providers_* still works in the - # way that it is called only once for one of the objects (at least this is how it looks like - # from running tests) - if self._providers_list_initialized: - return - start_time = perf_counter() - self.log.debug("Initializing Providers Manager list") # Local source folders are loaded first. They should take precedence over the package ones for # Development purpose. In production provider.yaml files are not present in the 'airflow" directory # So there is no risk we are going to override package provider accidentally. This can only happen @@ -137,39 +195,39 @@ def initialize_providers_list(self): self._discover_all_airflow_builtin_providers_from_local_sources() self._discover_all_providers_from_packages() self._provider_dict = OrderedDict(sorted(self._provider_dict.items())) - self.log.debug( - "Initialization of Providers Manager list took %.2f seconds", perf_counter() - start_time - ) - self._providers_list_initialized = True + @provider_info_cache("hooks") def initialize_providers_hooks(self): """Lazy initialization of providers hooks.""" - if self._providers_hooks_initialized: - return self.initialize_providers_list() - start_time = perf_counter() - self.log.debug("Initializing Providers Hooks") self._discover_hooks() self._hooks_dict = OrderedDict(sorted(self._hooks_dict.items())) self._connection_form_widgets = OrderedDict(sorted(self._connection_form_widgets.items())) self._field_behaviours = OrderedDict(sorted(self._field_behaviours.items())) - self.log.debug( - "Initialization of Providers Manager hooks took %.2f seconds", perf_counter() - start_time - ) - self._providers_hooks_initialized = True + @provider_info_cache("extra_links") def initialize_providers_extra_links(self): """Lazy initialization of providers extra links.""" - if self._providers_extra_links_initialized: - return self.initialize_providers_list() - start_time = perf_counter() - self.log.debug("Initializing Providers Extra Links") self._discover_extra_links() - self.log.debug( - "Initialization of Providers Manager extra links took %.2f seconds", perf_counter() - start_time - ) - self._providers_extra_links_initialized = True + + @provider_info_cache("logging") + def initialize_providers_logging(self): + """Lazy initialization of providers logging information.""" + self.initialize_providers_list() + self._discover_logging() + + @provider_info_cache("secrets_backends") + def initialize_providers_secrets_backends(self): + """Lazy initialization of providers secrets_backends information.""" + self.initialize_providers_list() + self._discover_secrets_backends() + + @provider_info_cache("auth_backends") + def initialize_providers_auth_backends(self): + """Lazy initialization of providers API auth_backends information.""" + self.initialize_providers_list() + self._discover_auth_backends() def _discover_all_providers_from_packages(self) -> None: """ @@ -285,16 +343,8 @@ def _add_hook(self, hook_class_name: str, provider_package: str) -> None: :param hook_class_name: name of the Hook class :param provider_package: provider package adding the hook """ - if provider_package.startswith("apache-airflow"): - provider_path = provider_package[len("apache-") :].replace("-", ".") - if not hook_class_name.startswith(provider_path): - log.warning( - "Sanity check failed when importing '%s' from '%s' package. It should start with '%s'", - hook_class_name, - provider_package, - provider_path, - ) - return + if not _sanity_check(provider_package, hook_class_name): + return if hook_class_name in self._hooks_dict: log.warning( "The hook_class '%s' has been already registered.", @@ -406,27 +456,33 @@ def _discover_extra_links(self) -> None: """Retrieves all extra links defined in the providers""" for provider_package, (_, provider) in self._provider_dict.items(): if provider.get("extra-links"): - for extra_link in provider["extra-links"]: - self._add_extra_link(extra_link, provider_package) + for extra_link_class_name in provider["extra-links"]: + if _sanity_check(provider_package, extra_link_class_name): + self._extra_link_class_name_set.add(extra_link_class_name) - def _add_extra_link(self, extra_link_class_name, provider_package) -> None: - """ - Adds extra link class name to the list of classes - :param extra_link_class_name: name of the class to add - :param provider_package: provider package adding the link - :return: - """ - if provider_package.startswith("apache-airflow"): - provider_path = provider_package[len("apache-") :].replace("-", ".") - if not extra_link_class_name.startswith(provider_path): - log.warning( - "Sanity check failed when importing '%s' from '%s' package. It should start with '%s'", - extra_link_class_name, - provider_package, - provider_path, - ) - return - self._extra_link_class_name_set.add(extra_link_class_name) + def _discover_logging(self) -> None: + """Retrieves all logging defined in the providers""" + for provider_package, (_, provider) in self._provider_dict.items(): + if provider.get("logging"): + for logging_class_name in provider["logging"]: + if _sanity_check(provider_package, logging_class_name): + self._logging_class_name_set.add(logging_class_name) + + def _discover_secrets_backends(self) -> None: + """Retrieves all secrets backends defined in the providers""" + for provider_package, (_, provider) in self._provider_dict.items(): + if provider.get("secrets-backends"): + for secrets_backends_class_name in provider["secrets-backends"]: + if _sanity_check(provider_package, secrets_backends_class_name): + self._secrets_backend_class_name_set.add(secrets_backends_class_name) + + def _discover_auth_backends(self) -> None: + """Retrieves all API auth backends defined in the providers""" + for provider_package, (_, provider) in self._provider_dict.items(): + if provider.get("auth-backends"): + for auth_backend_module_name in provider["auth-backends"]: + if _sanity_check(provider_package, auth_backend_module_name + ".init_app"): + self._api_auth_backend_module_names.add(auth_backend_module_name) @property def providers(self) -> Dict[str, ProviderInfo]: @@ -441,7 +497,7 @@ def hooks(self) -> Dict[str, HookInfo]: return self._hooks_dict @property - def extra_links_class_names(self) -> Set[str]: + def extra_links_class_names(self) -> List[str]: """Returns set of extra link class names.""" self.initialize_providers_extra_links() return sorted(self._extra_link_class_name_set) @@ -457,3 +513,21 @@ def field_behaviours(self) -> Dict[str, Dict]: """Returns dictionary with field behaviours for connection types.""" self.initialize_providers_hooks() return self._field_behaviours + + @property + def logging_class_names(self) -> List[str]: + """Returns set of log task handlers class names.""" + self.initialize_providers_logging() + return sorted(self._logging_class_name_set) + + @property + def secrets_backend_class_names(self) -> List[str]: + """Returns set of secret backend class names.""" + self.initialize_providers_secrets_backends() + return sorted(self._secrets_backend_class_name_set) + + @property + def auth_backend_module_names(self) -> List[str]: + """Returns set of API auth backend class names.""" + self.initialize_providers_auth_backends() + return sorted(self._api_auth_backend_module_names) diff --git a/scripts/in_container/run_install_and_test_provider_packages.sh b/scripts/in_container/run_install_and_test_provider_packages.sh index e2637f52a5aaa..1a75fcb706169 100755 --- a/scripts/in_container/run_install_and_test_provider_packages.sh +++ b/scripts/in_container/run_install_and_test_provider_packages.sh @@ -171,6 +171,51 @@ function discover_all_field_behaviours() { group_end } +function discover_all_logging_handlers() { + group_start "Listing available logging handlers via 'airflow providers logging'" + COLUMNS=180 airflow providers logging + + local actual_number_of_logging + actual_number_of_logging=$(airflow providers logging --output table | grep -c ^airflow.providers | xargs) + if (( actual_number_of_logging < 6 )); then + echo + echo "${COLOR_RED}ERROR: Number of logging handlers registered is wrong: ${actual_number_of_logging} ${COLOR_RESET}" + echo + exit 1 + fi + group_end +} + +function discover_all_secrets_backends() { + group_start "Listing available secrets backends via 'airflow providers secrets'" + COLUMNS=180 airflow providers secrets + + local actual_number_of_secrets + actual_number_of_secrets=$(airflow providers logging --output table | grep -c ^airflow.providers | xargs) + if (( actual_number_of_secrets < 5 )); then + echo + echo "${COLOR_RED}ERROR: Number of secrets backends registered is wrong: ${actual_number_of_secrets} ${COLOR_RESET}" + echo + exit 1 + fi + group_end +} + +function discover_all_auth_backends() { + group_start "Listing available API auth backends via 'airflow providers auth'" + COLUMNS=180 airflow providers auth + + local actual_number_of_auth + actual_number_of_auth=$(airflow providers logging --output table | grep -c ^airflow.providers | xargs) + if (( actual_number_of_auth < 1 )); then + echo + echo "${COLOR_RED}ERROR: Number of auth backends registered is wrong: ${actual_number_of_auth} ${COLOR_RESET}" + echo + exit 1 + fi + group_end +} + setup_provider_packages verify_parameters install_airflow_as_specified @@ -182,3 +227,6 @@ discover_all_hooks discover_all_connection_form_widgets discover_all_field_behaviours discover_all_extra_links +discover_all_logging_handlers +discover_all_secrets_backends +discover_all_auth_backends diff --git a/tests/core/test_providers_manager.py b/tests/core/test_providers_manager.py index 138a468c8bbae..60aaec1b051a2 100644 --- a/tests/core/test_providers_manager.py +++ b/tests/core/test_providers_manager.py @@ -54,3 +54,18 @@ def test_extra_links(self): provider_manager = ProvidersManager() extra_link_class_names = list(provider_manager.extra_links_class_names) assert len(extra_link_class_names) > 5 + + def test_logging(self): + provider_manager = ProvidersManager() + logging_class_names = list(provider_manager.logging_class_names) + assert len(logging_class_names) > 5 + + def test_secrets_backends(self): + provider_manager = ProvidersManager() + secrets_backends_class_names = list(provider_manager.secrets_backend_class_names) + assert len(secrets_backends_class_names) > 4 + + def test_auth_backends(self): + provider_manager = ProvidersManager() + auth_backend_module_names = list(provider_manager.auth_backend_module_names) + assert len(auth_backend_module_names) > 0