Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/snowflake): handle dots in snowflake table names #12105

Merged
merged 3 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import abc
from functools import cached_property
from typing import ClassVar, Literal, Optional, Tuple
from typing import ClassVar, List, Literal, Optional, Tuple

from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
Expand Down Expand Up @@ -184,6 +184,46 @@ def _is_sys_table(table_name: str) -> bool:
return table_name.lower().startswith("sys$")


def _split_qualified_name(qualified_name: str) -> List[str]:
"""
Split a qualified name into its constituent parts.

>>> _split_qualified_name("db.my_schema.my_table")
['db', 'my_schema', 'my_table']
>>> _split_qualified_name('"db"."my_schema"."my_table"')
['db', 'my_schema', 'my_table']
>>> _split_qualified_name('TEST_DB.TEST_SCHEMA."TABLE.WITH.DOTS"')
['TEST_DB', 'TEST_SCHEMA', 'TABLE.WITH.DOTS']
>>> _split_qualified_name('TEST_DB."SCHEMA.WITH.DOTS".MY_TABLE')
['TEST_DB', 'SCHEMA.WITH.DOTS', 'MY_TABLE']
"""
Comment on lines +187 to +199
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function seems quite generic and not specific to Snowflake. Could it be moved to a some shared utils file?

Also, consider turning the docstring into a proper unit test.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because of doctest, the docstring is a unit test :)

if we find another system that formats table names the same way, we can move this util into a shared utils location


# Fast path - no quotes.
if '"' not in qualified_name:
return qualified_name.split(".")

# First pass - split on dots that are not inside quotes.
in_quote = False
parts: List[List[str]] = [[]]
for char in qualified_name:
if char == '"':
in_quote = not in_quote
elif char == "." and not in_quote:
parts.append([])
else:
parts[-1].append(char)

# Second pass - remove outer pairs of quotes.
result = []
for part in parts:
if len(part) > 2 and part[0] == '"' and part[-1] == '"':
part = part[1:-1]

result.append("".join(part))

return result


# Qualified Object names from snowflake audit logs have quotes for for snowflake quoted identifiers,
# For example "test-database"."test-schema".test_table
# whereas we generate urns without quotes even for quoted identifiers for backward compatibility
Expand All @@ -192,7 +232,7 @@ def _is_sys_table(table_name: str) -> bool:
def _cleanup_qualified_name(
qualified_name: str, structured_reporter: SourceReport
) -> str:
name_parts = qualified_name.split(".")
name_parts = _split_qualified_name(qualified_name)
if len(name_parts) != 3:
if not _is_sys_table(qualified_name):
structured_reporter.info(
Expand All @@ -203,9 +243,9 @@ def _cleanup_qualified_name(
)
return qualified_name.replace('"', "")
return _combine_identifier_parts(
db_name=name_parts[0].strip('"'),
schema_name=name_parts[1].strip('"'),
table_name=name_parts[2].strip('"'),
db_name=name_parts[0],
schema_name=name_parts[1],
table_name=name_parts[2],
)


Expand Down
12 changes: 12 additions & 0 deletions metadata-ingestion/src/datahub/testing/doctest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import doctest
from types import ModuleType


def assert_doctest(module: ModuleType) -> None:
result = doctest.testmod(
module,
raise_on_error=True,
verbose=True,
)
if result.attempted == 0:
raise ValueError(f"No doctests found in {module.__name__}")
14 changes: 4 additions & 10 deletions metadata-ingestion/tests/integration/git/test_git_clone.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import doctest
import os

import pytest
from pydantic import SecretStr

import datahub.ingestion.source.git.git_import
from datahub.configuration.common import ConfigurationWarning
from datahub.configuration.git import GitInfo, GitReference
from datahub.ingestion.source.git.git_import import GitClone
from datahub.testing.doctest import assert_doctest

LOOKML_TEST_SSH_KEY = os.environ.get("DATAHUB_LOOKML_GIT_TEST_SSH_KEY")

Expand Down Expand Up @@ -82,15 +83,8 @@ def test_github_branch():
assert config.branch_for_clone == "main"


def test_sanitize_repo_url():
import datahub.ingestion.source.git.git_import

assert (
doctest.testmod(
datahub.ingestion.source.git.git_import, raise_on_error=True
).attempted
== 3
)
def test_sanitize_repo_url() -> None:
assert_doctest(datahub.ingestion.source.git.git_import)


def test_git_clone_public(tmp_path):
Expand Down
17 changes: 4 additions & 13 deletions metadata-ingestion/tests/unit/sagemaker/test_sagemaker_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from botocore.stub import Stubber
from freezegun import freeze_time

import datahub.ingestion.source.aws.sagemaker_processors.models
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.sink.file import write_metadata_file
from datahub.ingestion.source.aws.sagemaker import (
Expand All @@ -13,6 +14,7 @@
job_type_to_info,
job_types,
)
from datahub.testing.doctest import assert_doctest
from tests.test_helpers import mce_helpers
from tests.unit.sagemaker.test_sagemaker_source_stubs import (
describe_endpoint_response_1,
Expand Down Expand Up @@ -243,16 +245,5 @@ def test_sagemaker_ingest(tmp_path, pytestconfig):
)


def test_doc_test_run():
import doctest

import datahub.ingestion.source.aws.sagemaker_processors.models

assert (
doctest.testmod(
datahub.ingestion.source.aws.sagemaker_processors.models,
raise_on_error=True,
verbose=True,
).attempted
== 1
)
def test_doc_test_run() -> None:
assert_doctest(datahub.ingestion.source.aws.sagemaker_processors.models)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest
from pydantic import ValidationError

import datahub.ingestion.source.snowflake.snowflake_utils
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.pattern_utils import UUID_REGEX
from datahub.ingestion.api.source import SourceCapability
Expand All @@ -26,6 +27,7 @@
)
from datahub.ingestion.source.snowflake.snowflake_utils import SnowsightUrlBuilder
from datahub.ingestion.source.snowflake.snowflake_v2 import SnowflakeV2Source
from datahub.testing.doctest import assert_doctest
from tests.test_helpers import test_connection_helpers

default_oauth_dict: Dict[str, Any] = {
Expand Down Expand Up @@ -658,3 +660,7 @@ def test_create_snowsight_base_url_ap_northeast_1():
).snowsight_base_url

assert result == "https://app.snowflake.com/ap-northeast-1.aws/account_locator/"


def test_snowflake_utils() -> None:
assert_doctest(datahub.ingestion.source.snowflake.snowflake_utils)
4 changes: 2 additions & 2 deletions metadata-ingestion/tests/unit/test_dbt_source.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import doctest
from datetime import timedelta
from typing import Dict, List, Union
from unittest import mock
Expand All @@ -22,6 +21,7 @@
OwnershipSourceTypeClass,
OwnershipTypeClass,
)
from datahub.testing.doctest import assert_doctest


def create_owners_list_from_urn_list(
Expand Down Expand Up @@ -442,7 +442,7 @@ def test_dbt_cloud_config_with_defined_metadata_endpoint():


def test_infer_metadata_endpoint() -> None:
assert doctest.testmod(dbt_cloud, raise_on_error=True).attempted > 0
assert_doctest(dbt_cloud)


def test_dbt_time_parsing() -> None:
Expand Down
14 changes: 4 additions & 10 deletions metadata-ingestion/tests/unit/utilities/test_utilities.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import doctest
import re
from typing import List

import datahub.utilities.logging_manager
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.sql_parsing.sqlglot_lineage import sqlglot_lineage
from datahub.testing.doctest import assert_doctest
from datahub.utilities.delayed_iter import delayed_iter
from datahub.utilities.is_pytest import is_pytest_running
from datahub.utilities.urns.dataset_urn import DatasetUrn
Expand Down Expand Up @@ -328,15 +329,8 @@ def test_sqllineage_sql_parser_tables_with_special_names():
assert sorted(SqlLineageSQLParser(sql_query).get_columns()) == expected_columns


def test_logging_name_extraction():
import datahub.utilities.logging_manager

assert (
doctest.testmod(
datahub.utilities.logging_manager, raise_on_error=True
).attempted
> 0
)
def test_logging_name_extraction() -> None:
assert_doctest(datahub.utilities.logging_manager)


def test_is_pytest_running() -> None:
Expand Down
Loading