Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): replace sqllineage/sqlparse with our SQL parser #12020

23 changes: 13 additions & 10 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
## Next

- #11560 - The PowerBI ingestion source configuration option include_workspace_name_in_dataset_urn determines whether the workspace name is included in the PowerBI dataset's URN.<br/> PowerBI allows to have identical name of semantic model and their tables across the workspace, It will overwrite the semantic model in-case of multi-workspace ingestion.<br/>
Entity urn with `include_workspace_name_in_dataset_urn: false`
```
urn:li:dataset:(urn:li:dataPlatform:powerbi,[<PlatformInstance>.]<SemanticModelName>.<TableName>,<ENV>)
```
Entity urn with `include_workspace_name_in_dataset_urn: false`

Entity urn with `include_workspace_name_in_dataset_urn: true`
```
urn:li:dataset:(urn:li:dataPlatform:powerbi,[<PlatformInstance>.].<WorkspaceName>.<SemanticModelName>.<TableName>,<ENV>)
```
```
urn:li:dataset:(urn:li:dataPlatform:powerbi,[<PlatformInstance>.]<SemanticModelName>.<TableName>,<ENV>)
```

Entity urn with `include_workspace_name_in_dataset_urn: true`

```
urn:li:dataset:(urn:li:dataPlatform:powerbi,[<PlatformInstance>.].<WorkspaceName>.<SemanticModelName>.<TableName>,<ENV>)
```

The config `include_workspace_name_in_dataset_urn` is default to `false` for backward compatiblity, However, we recommend enabling this flag after performing the necessary cleanup.
If stateful ingestion is enabled, running ingestion with the latest CLI version will handle the cleanup automatically. Otherwise, we recommend soft deleting all powerbi data via the DataHub CLI:
`datahub delete --platform powerbi --soft` and then re-ingest with the latest CLI version, ensuring the `include_workspace_name_in_dataset_urn` configuration is set to true.
`datahub delete --platform powerbi --soft` and then re-ingest with the latest CLI version, ensuring the `include_workspace_name_in_dataset_urn` configuration is set to true.

- #11701: The Fivetran `sources_to_database` field is deprecated in favor of setting directly within `sources_to_platform_instance.<key>.database`.
- #11742: For PowerBi ingestion, `use_powerbi_email` is now enabled by default when extracting ownership information.
Expand All @@ -46,7 +48,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- #11619 - schema field/column paths can no longer be duplicated within the schema
- #11570 - The `DatahubClientConfig`'s server field no longer defaults to `http://localhost:8080`. Be sure to explicitly set this.
- #11570 - If a `datahub_api` is explicitly passed to a stateful ingestion config provider, it will be used. We previously ignored it if the pipeline context also had a graph object.
- #11518 - DataHub Garbage Collection: Various entities that are soft-deleted (after 10d) or are timeseries *entities* (dataprocess, execution requests) will be removed automatically using logic in the `datahub-gc` ingestion source.
- #11518 - DataHub Garbage Collection: Various entities that are soft-deleted (after 10d) or are timeseries _entities_ (dataprocess, execution requests) will be removed automatically using logic in the `datahub-gc` ingestion source.
- #12020 - Removed the `sqlparse` and `sqllineage` dependencies from ingestion and eliminated the `sql_parser` configuration from the Redash source, as Redash now exclusively uses the SQLGlot-based parser for lineage extraction.

### Potential Downtime

Expand Down
23 changes: 5 additions & 18 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,6 @@
| classification_lib
)

sqllineage_lib = {
"sqllineage==1.3.8",
# We don't have a direct dependency on sqlparse but it is a dependency of sqllineage.
# There have previously been issues from not pinning sqlparse, so it's best to pin it.
# Related: https://github.com/reata/sqllineage/issues/361 and https://github.com/reata/sqllineage/pull/360
"sqlparse==0.4.4",
}

aws_common = {
# AWS Python SDK
"boto3",
Expand Down Expand Up @@ -207,7 +199,6 @@
"sqlalchemy-redshift>=0.8.3",
"GeoAlchemy2",
"redshift-connector>=2.1.0",
*sqllineage_lib,
*path_spec_common,
}

Expand Down Expand Up @@ -455,9 +446,7 @@
# It's technically wrong for packages to depend on setuptools. However, it seems mlflow does it anyways.
"setuptools",
},
"mode": {"requests", "python-liquid", "tenacity>=8.0.1"}
| sqllineage_lib
| sqlglot_lib,
"mode": {"requests", "python-liquid", "tenacity>=8.0.1"} | sqlglot_lib,
"mongodb": {"pymongo[srv]>=3.11", "packaging"},
"mssql": sql_common | mssql_common,
"mssql-odbc": sql_common | mssql_common | {"pyodbc"},
Expand All @@ -473,7 +462,7 @@
| pyhive_common
| {"psycopg2-binary", "pymysql>=1.0.2"},
"pulsar": {"requests"},
"redash": {"redash-toolbelt", "sql-metadata"} | sqllineage_lib,
"redash": {"redash-toolbelt", "sql-metadata"} | sqlglot_lib,
"redshift": sql_common
| redshift_common
| usage_common
Expand All @@ -494,9 +483,7 @@
"slack": slack,
"superset": superset_common,
"preset": superset_common,
# FIXME: I don't think tableau uses sqllineage anymore so we should be able
# to remove that dependency.
"tableau": {"tableauserverclient>=0.24.0"} | sqllineage_lib | sqlglot_lib,
"tableau": {"tableauserverclient>=0.24.0"} | sqlglot_lib,
"teradata": sql_common
| usage_common
| sqlglot_lib
Expand All @@ -518,9 +505,9 @@
),
"powerbi-report-server": powerbi_report_server,
"vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.2"},
"unity-catalog": databricks | sql_common | sqllineage_lib,
"unity-catalog": databricks | sql_common,
# databricks is alias for unity-catalog and needs to be kept in sync
"databricks": databricks | sql_common | sqllineage_lib,
"databricks": databricks | sql_common,
"fivetran": snowflake_common | bigquery_common | sqlglot_lib,
"qlik-sense": sqlglot_lib | {"requests", "websocket-client"},
"sigma": sqlglot_lib | {"requests"},
Expand Down
23 changes: 0 additions & 23 deletions metadata-ingestion/src/datahub/ingestion/source/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from requests.adapters import HTTPAdapter, Retry
from requests.exceptions import ConnectionError
from requests.models import HTTPBasicAuth, HTTPError
from sqllineage.runner import LineageRunner
from tenacity import retry_if_exception_type, stop_after_attempt, wait_exponential

import datahub.emitter.mce_builder as builder
Expand Down Expand Up @@ -820,28 +819,6 @@ def _get_definition(self, definition_name):
)
return None

@lru_cache(maxsize=None)
def _get_source_from_query(self, raw_query: str) -> set:
query = self._replace_definitions(raw_query)
parser = LineageRunner(query)
source_paths = set()
try:
for table in parser.source_tables:
sources = str(table).split(".")
source_schema, source_table = sources[-2], sources[-1]
if source_schema == "<default>":
source_schema = str(self.config.default_schema)

source_paths.add(f"{source_schema}.{source_table}")
except Exception as e:
self.report.report_failure(
title="Failed to Extract Lineage From Query",
message="Unable to retrieve lineage from Mode query.",
context=f"Query: {raw_query}, Error: {str(e)}",
)

return source_paths

def _get_datasource_urn(
self,
platform: str,
Expand Down
76 changes: 13 additions & 63 deletions metadata-ingestion/src/datahub/ingestion/source/redash.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import math
import sys
from dataclasses import dataclass, field
from typing import Dict, Iterable, List, Optional, Set, Type
from typing import Dict, Iterable, List, Optional, Set

import dateutil.parser as dp
from packaging import version
Expand All @@ -22,7 +22,6 @@
platform_name,
support_status,
)
from datahub.ingestion.api.registry import import_path
from datahub.ingestion.api.source import Source, SourceCapability, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.common import (
Expand All @@ -39,9 +38,9 @@
ChartTypeClass,
DashboardInfoClass,
)
from datahub.sql_parsing.sqlglot_lineage import create_lineage_sql_parsed_result
from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.sql_parser_base import SQLParser
from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -270,10 +269,6 @@ class RedashConfig(ConfigModel):
parse_table_names_from_sql: bool = Field(
default=False, description="See note below."
)
sql_parser: str = Field(
default="datahub.utilities.sql_parser.DefaultSQLParser",
description="custom SQL parser. See note below for details.",
)

env: str = Field(
default=DEFAULT_ENV,
Expand Down Expand Up @@ -354,7 +349,6 @@ def __init__(self, ctx: PipelineContext, config: RedashConfig):
self.api_page_limit = self.config.api_page_limit or math.inf

self.parse_table_names_from_sql = self.config.parse_table_names_from_sql
self.sql_parser_path = self.config.sql_parser

logger.info(
f"Running Redash ingestion with parse_table_names_from_sql={self.parse_table_names_from_sql}"
Expand All @@ -380,31 +374,6 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
config = RedashConfig.parse_obj(config_dict)
return cls(ctx, config)

@classmethod
def _import_sql_parser_cls(cls, sql_parser_path: str) -> Type[SQLParser]:
assert "." in sql_parser_path, "sql_parser-path must contain a ."
parser_cls = import_path(sql_parser_path)

if not issubclass(parser_cls, SQLParser):
raise ValueError(f"must be derived from {SQLParser}; got {parser_cls}")
return parser_cls

@classmethod
def _get_sql_table_names(cls, sql: str, sql_parser_path: str) -> List[str]:
parser_cls = cls._import_sql_parser_cls(sql_parser_path)

try:
sql_table_names: List[str] = parser_cls(sql).get_tables()
except Exception as e:
logger.warning(f"Sql parser failed on {sql} with {e}")
return []

# Remove quotes from table names
sql_table_names = [t.replace('"', "") for t in sql_table_names]
sql_table_names = [t.replace("`", "") for t in sql_table_names]

return sql_table_names

def _get_chart_data_source(self, data_source_id: Optional[int] = None) -> Dict:
url = f"/api/data_sources/{data_source_id}"
resp = self.client._get(url).json()
Expand Down Expand Up @@ -441,14 +410,6 @@ def _get_database_name_based_on_datasource(

return database_name

def _construct_datalineage_urn(
self, platform: str, database_name: str, sql_table_name: str
) -> str:
full_dataset_name = get_full_qualified_name(
platform, database_name, sql_table_name
)
return builder.make_dataset_urn(platform, full_dataset_name, self.config.env)

def _get_datasource_urns(
self, data_source: Dict, sql_query_data: Dict = {}
) -> Optional[List[str]]:
Expand All @@ -464,34 +425,23 @@ def _get_datasource_urns(
# Getting table lineage from SQL parsing
if self.parse_table_names_from_sql and data_source_syntax == "sql":
dataset_urns = list()
try:
sql_table_names = self._get_sql_table_names(
query, self.sql_parser_path
)
except Exception as e:
sql_parser_in_tables = create_lineage_sql_parsed_result(
query=query,
platform=platform,
env=self.config.env,
platform_instance=None,
default_db=database_name,
)
# make sure dataset_urns is not empty list
dataset_urns = sql_parser_in_tables.in_tables
if sql_parser_in_tables.debug_info.table_error:
self.report.queries_problem_parsing.add(str(query_id))
self.error(
logger,
"sql-parsing",
f"exception {e} in parsing query-{query_id}-datasource-{data_source_id}",
f"exception {sql_parser_in_tables.debug_info.table_error} in parsing query-{query_id}-datasource-{data_source_id}",
)
sql_table_names = []
for sql_table_name in sql_table_names:
try:
dataset_urns.append(
self._construct_datalineage_urn(
platform, database_name, sql_table_name
)
)
except Exception:
self.report.queries_problem_parsing.add(str(query_id))
self.warn(
logger,
"data-urn-invalid",
f"Problem making URN for {sql_table_name} parsed from query {query_id}",
)

# make sure dataset_urns is not empty list
return dataset_urns if len(dataset_urns) > 0 else None

else:
Expand Down
35 changes: 25 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/unity/usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import pyspark
from databricks.sdk.service.sql import QueryStatementType
from sqllineage.runner import LineageRunner

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.source_helpers import auto_empty_dataset_usage_statistics
Expand All @@ -22,7 +21,9 @@
from datahub.ingestion.source.unity.report import UnityCatalogReport
from datahub.ingestion.source.usage.usage_common import UsageAggregator
from datahub.metadata.schema_classes import OperationClass
from datahub.sql_parsing.sqlglot_lineage import create_lineage_sql_parsed_result
from datahub.sql_parsing.sqlglot_utils import get_query_fingerprint
from datahub.utilities.urns.dataset_urn import DatasetUrn

logger = logging.getLogger(__name__)

Expand All @@ -48,6 +49,7 @@ class UnityCatalogUsageExtractor:
proxy: UnityCatalogApiProxy
table_urn_builder: Callable[[TableReference], str]
user_urn_builder: Callable[[str], str]
platform: str = "databricks"

def __post_init__(self):
self.usage_aggregator = UsageAggregator[TableReference](self.config)
Expand Down Expand Up @@ -173,7 +175,7 @@ def _parse_query(
self, query: Query, table_map: TableMap
) -> Optional[QueryTableInfo]:
with self.report.usage_perf_report.sql_parsing_timer:
table_info = self._parse_query_via_lineage_runner(query.query_text)
table_info = self._parse_query_via_sqlglot(query.query_text)
if table_info is None and query.statement_type == QueryStatementType.SELECT:
with self.report.usage_perf_report.spark_sql_parsing_timer:
table_info = self._parse_query_via_spark_sql_plan(query.query_text)
Expand All @@ -191,32 +193,45 @@ def _parse_query(
),
)

def _parse_query_via_lineage_runner(self, query: str) -> Optional[StringTableInfo]:
def _parse_query_via_sqlglot(self, query: str) -> Optional[StringTableInfo]:
try:
runner = LineageRunner(query)
sql_parser_in_tables = create_lineage_sql_parsed_result(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably be setting the default db / schema when calling create_lineage_sql_parsed_result

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @hsheth2 , We don’t need it here since we only require table names. In the previous implementation as well, we didn’t include the database/schema.

query=query,
default_db=None,
platform=self.platform,
env=self.config.env,
platform_instance=None,
)

return GenericTableInfo(
source_tables=[
self._parse_sqllineage_table(table)
for table in runner.source_tables
self._parse_sqlglot_table(table)
for table in sql_parser_in_tables.in_tables
],
target_tables=[
self._parse_sqllineage_table(table)
for table in runner.target_tables
self._parse_sqlglot_table(table)
for table in sql_parser_in_tables.out_tables
],
)
except Exception as e:
logger.info(f"Could not parse query via lineage runner, {query}: {e!r}")
logger.info(f"Could not parse query via sqlglot, {query}: {e!r}")
return None

@staticmethod
def _parse_sqllineage_table(sqllineage_table: object) -> str:
def _parse_sqllineage_table(sqllineage_table: str) -> str:
full_table_name = str(sqllineage_table)
default_schema = "<default>."
if full_table_name.startswith(default_schema):
return full_table_name[len(default_schema) :]
else:
return full_table_name

@staticmethod
def _parse_sqlglot_table(table_urn: str) -> str:
return UnityCatalogUsageExtractor._parse_sqllineage_table(
DatasetUrn.from_string(table_urn).name
)

def _parse_query_via_spark_sql_plan(self, query: str) -> Optional[StringTableInfo]:
"""Parse query source tables via Spark SQL plan. This is a fallback option."""
# Would be more effective if we upgrade pyspark
Expand Down
Loading
Loading