Skip to content

Commit

Permalink
fix(ingestion/oracle): Improved foreign key handling (#11867)
Browse files Browse the repository at this point in the history
Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
  • Loading branch information
acrylJonny and hsheth2 authored Mar 6, 2025
1 parent a700448 commit fcabe88
Show file tree
Hide file tree
Showing 3 changed files with 1,602 additions and 67 deletions.
156 changes: 93 additions & 63 deletions metadata-ingestion/src/datahub/ingestion/source/sql/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,20 @@ def __init__(self, inspector_instance: Inspector):
self.exclude_tablespaces: Tuple[str, str] = ("SYSTEM", "SYSAUX")

def get_db_name(self) -> str:
db_name = None
try:
# Try to retrieve current DB name by executing query
db_name = self._inspector_instance.bind.execute(
sql.text("select sys_context('USERENV','DB_NAME') from dual")
).scalar()
return str(db_name)
except sqlalchemy.exc.DatabaseError as e:
logger.error("Error fetching DB name: " + str(e))
self.report.failure(
title="Error fetching database name using sys_context.",
message="database_fetch_error",
context=db_name,
exc=e,
)
return ""

def get_schema_names(self) -> List[str]:
Expand Down Expand Up @@ -326,8 +332,8 @@ def get_columns(
try:
coltype = ischema_names[coltype]()
except KeyError:
logger.warning(
f"Did not recognize type {coltype} of column {colname}"
logger.info(
f"Unrecognized column datatype {coltype} of column {colname}"
)
coltype = sqltypes.NULLTYPE

Expand Down Expand Up @@ -379,8 +385,8 @@ def get_table_comment(self, table_name: str, schema: Optional[str] = None) -> Di
COMMENT_SQL = """
SELECT comments
FROM dba_tab_comments
WHERE table_name = CAST(:table_name AS VARCHAR(128))
AND owner = CAST(:schema_name AS VARCHAR(128))
WHERE table_name = :table_name
AND owner = :schema_name
"""

c = self._inspector_instance.bind.execute(
Expand All @@ -397,79 +403,93 @@ def _get_constraint_data(

text = (
"SELECT"
"\nac.constraint_name," # 0
"\nac.constraint_type," # 1
"\nloc.column_name AS local_column," # 2
"\nrem.table_name AS remote_table," # 3
"\nrem.column_name AS remote_column," # 4
"\nrem.owner AS remote_owner," # 5
"\nloc.position as loc_pos," # 6
"\nrem.position as rem_pos," # 7
"\nac.search_condition," # 8
"\nac.delete_rule" # 9
"\nFROM dba_constraints%(dblink)s ac,"
"\ndba_cons_columns%(dblink)s loc,"
"\ndba_cons_columns%(dblink)s rem"
"\nWHERE ac.table_name = CAST(:table_name AS VARCHAR2(128))"
"\nAND ac.constraint_type IN ('R','P', 'U', 'C')"
"\nac.constraint_name,"
"\nac.constraint_type,"
"\nacc.column_name AS local_column,"
"\nNULL AS remote_table,"
"\nNULL AS remote_column,"
"\nNULL AS remote_owner,"
"\nacc.position AS loc_pos,"
"\nNULL AS rem_pos,"
"\nac.search_condition,"
"\nac.delete_rule"
"\nFROM dba_constraints ac"
"\nJOIN dba_cons_columns acc"
"\nON ac.owner = acc.owner"
"\nAND ac.constraint_name = acc.constraint_name"
"\nAND ac.table_name = acc.table_name"
"\nWHERE ac.table_name = :table_name"
"\nAND ac.constraint_type IN ('P', 'U', 'C')"
)

if schema is not None:
params["owner"] = schema
text += "\nAND ac.owner = CAST(:owner AS VARCHAR2(128))"
text += "\nAND ac.owner = :owner"

# Splitting into queries with UNION ALL for execution efficiency
text += (
"\nAND ac.owner = loc.owner"
"\nAND ac.constraint_name = loc.constraint_name"
"\nAND ac.r_owner = rem.owner(+)"
"\nAND ac.r_constraint_name = rem.constraint_name(+)"
"\nAND (rem.position IS NULL or loc.position=rem.position)"
"\nORDER BY ac.constraint_name, loc.position"
"\nUNION ALL"
"\nSELECT"
"\nac.constraint_name,"
"\nac.constraint_type,"
"\nacc.column_name AS local_column,"
"\nac.r_table_name AS remote_table,"
"\nrcc.column_name AS remote_column,"
"\nac.r_owner AS remote_owner,"
"\nacc.position AS loc_pos,"
"\nrcc.position AS rem_pos,"
"\nac.search_condition,"
"\nac.delete_rule"
"\nFROM dba_constraints ac"
"\nJOIN dba_cons_columns acc"
"\nON ac.owner = acc.owner"
"\nAND ac.constraint_name = acc.constraint_name"
"\nAND ac.table_name = acc.table_name"
"\nLEFT JOIN dba_cons_columns rcc"
"\nON ac.r_owner = rcc.owner"
"\nAND ac.r_constraint_name = rcc.constraint_name"
"\nAND acc.position = rcc.position"
"\nWHERE ac.table_name = :table_name"
"\nAND ac.constraint_type = 'R'"
)

text = text % {"dblink": dblink}
if schema is not None:
text += "\nAND ac.owner = :owner"

text += "\nORDER BY constraint_name, loc_pos"

rp = self._inspector_instance.bind.execute(sql.text(text), params)
constraint_data = rp.fetchall()
return constraint_data
return rp.fetchall()

def get_pk_constraint(
self, table_name: str, schema: Optional[str] = None, dblink: str = ""
) -> Dict:
denormalized_table_name = self._inspector_instance.dialect.denormalize_name(
table_name
)
assert denormalized_table_name

schema = self._inspector_instance.dialect.denormalize_name(
schema or self.default_schema_name
)

if schema is None:
schema = self._inspector_instance.dialect.default_schema_name

pkeys = []
constraint_name = None
constraint_data = self._get_constraint_data(
denormalized_table_name, schema, dblink
)

for row in constraint_data:
(
cons_name,
cons_type,
local_column,
remote_table,
remote_column,
remote_owner,
) = row[0:2] + tuple(
[self._inspector_instance.dialect.normalize_name(x) for x in row[2:6]]
try:
for row in self._get_constraint_data(table_name, schema, dblink):
if row[1] == "P": # constraint_type is 'P' for primary key
if constraint_name is None:
constraint_name = (
self._inspector_instance.dialect.normalize_name(row[0])
)
col_name = self._inspector_instance.dialect.normalize_name(
row[2]
) # local_column
pkeys.append(col_name)
except Exception as e:
self.report.warning(
title="Failed to Process Primary Keys",
message=(
f"Unable to process primary key constraints for {schema}.{table_name}. "
"Ensure SELECT access on DBA_CONSTRAINTS and DBA_CONS_COLUMNS.",
),
context=f"{schema}.{table_name}",
exc=e,
)
if cons_type == "P":
if constraint_name is None:
constraint_name = self._inspector_instance.dialect.normalize_name(
cons_name
)
pkeys.append(local_column)
# Return empty constraint if we can't process it
return {"constrained_columns": [], "name": None}

return {"constrained_columns": pkeys, "name": constraint_name}

Expand Down Expand Up @@ -527,6 +547,16 @@ def fkey_rec():
f"dba_cons_columns{dblink} - does the user have "
"proper rights to the table?"
)
self.report.warning(
title="Missing Table Permissions",
message=(
f"Unable to query table_name from dba_cons_columns{dblink}. "
"This usually indicates insufficient permissions on the target table. "
f"Foreign key relationships will not be detected for {schema}.{table_name}. "
"Please ensure the user has SELECT privileges on dba_cons_columns."
),
context=f"{schema}.{table_name}",
)

rec = fkeys[cons_name]
rec["name"] = cons_name
Expand Down Expand Up @@ -573,8 +603,8 @@ def get_view_definition(
text = "SELECT text FROM dba_views WHERE view_name=:view_name"

if schema is not None:
text += " AND owner = :schema"
params["schema"] = schema
params["owner"] = schema
text += "\nAND owner = :owner"

rp = self._inspector_instance.bind.execute(sql.text(text), params).scalar()

Expand Down
Loading

0 comments on commit fcabe88

Please sign in to comment.