From 2fee322bf9f734da1242d47d41bea577beeaf0d8 Mon Sep 17 00:00:00 2001 From: John Joyce Date: Wed, 22 Jan 2025 08:57:42 -0800 Subject: [PATCH] feat(): Introducing support for DataHub Events Source (#142) * Wrapping it up * Adding event source * Adding more tests * Adding datahub cloud events source * Adding prerequisites * Adding docs * Addressing gabes comments * Update hello_world_datahub_cloud.yaml * Update datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_event_source.py Co-authored-by: Gabe Lyons * Remove monkeypatched method --------- Co-authored-by: John Joyce Co-authored-by: John Joyce Co-authored-by: John Joyce Co-authored-by: Gabe Lyons Co-authored-by: John Joyce --- datahub-actions/setup.py | 1 + .../src/datahub_actions/cli/actions.py | 32 +- .../plugin/source/acryl/__init__.py | 0 .../plugin/source/acryl/constants.py | 2 + .../acryl/datahub_cloud_event_source.py | 206 +++++++++++++ .../acryl/datahub_cloud_events_ack_manager.py | 35 +++ .../acryl/datahub_cloud_events_consumer.py | 172 +++++++++++ ...hub_cloud_events_consumer_offsets_store.py | 105 +++++++ .../source/event_source_registry.py | 4 + .../unit/plugin/source/acryl/__init__.py | 0 .../acryl/test_datahub_cloud_event_source.py | 290 ++++++++++++++++++ .../test_datahub_cloud_events_ack_manager.py | 67 ++++ .../test_datahub_cloud_events_consumer.py | 280 +++++++++++++++++ ...hub_cloud_events_consumer_offsets_store.py | 138 +++++++++ docs/sources/datahub-cloud-event-source.md | 113 +++++++ examples/hello_world_datahub_cloud.yaml | 13 + 16 files changed, 1434 insertions(+), 24 deletions(-) create mode 100644 datahub-actions/src/datahub_actions/plugin/source/acryl/__init__.py create mode 100644 datahub-actions/src/datahub_actions/plugin/source/acryl/constants.py create mode 100644 datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_event_source.py create mode 100644 datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_events_ack_manager.py create mode 100644 datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_events_consumer.py create mode 100644 datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_events_consumer_offsets_store.py create mode 100644 datahub-actions/tests/unit/plugin/source/acryl/__init__.py create mode 100644 datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_event_source.py create mode 100644 datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_events_ack_manager.py create mode 100644 datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_events_consumer.py create mode 100644 datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_events_consumer_offsets_store.py create mode 100644 docs/sources/datahub-cloud-event-source.md create mode 100644 examples/hello_world_datahub_cloud.yaml diff --git a/datahub-actions/setup.py b/datahub-actions/setup.py index e484c24a..c30a4e82 100644 --- a/datahub-actions/setup.py +++ b/datahub-actions/setup.py @@ -127,6 +127,7 @@ def get_long_description(): "jsonpickle", "build", "twine", + "tenacity", *list( dependency for plugin in [ diff --git a/datahub-actions/src/datahub_actions/cli/actions.py b/datahub-actions/src/datahub_actions/cli/actions.py index 97efd41f..d695ae23 100644 --- a/datahub-actions/src/datahub_actions/cli/actions.py +++ b/datahub-actions/src/datahub_actions/cli/actions.py @@ -17,16 +17,11 @@ import signal import sys import time -import unittest from typing import Any, List import click from click_default_group import DefaultGroup -from datahub.configuration.config_loader import ( - Environ, - _resolve_element, - load_config_file, -) +from datahub.configuration.config_loader import load_config_file import datahub_actions as datahub_actions_package from datahub_actions.pipeline.pipeline import Pipeline @@ -39,13 +34,6 @@ pipeline_manager = PipelineManager() -def best_effort_resolve_element(x: str, environ: Environ) -> str: - try: - return _resolve_element(x, environ=environ) - except Exception: - return x - - def pipeline_config_to_pipeline(pipeline_config: dict) -> Pipeline: logger.debug( f"Attempting to create Actions Pipeline using config {pipeline_config.get('name')}" @@ -97,17 +85,13 @@ def run(ctx: Any, config: List[str], debug: bool) -> None: if config is not None: for pipeline_config in config: pipeline_config_file = pathlib.Path(pipeline_config) - with unittest.mock.patch( - "datahub.configuration.config_loader._resolve_element" - ) as mock_resolve_element: - mock_resolve_element.side_effect = best_effort_resolve_element - pipeline_config_dict = load_config_file(pipeline_config_file) - enabled = pipeline_config_dict.get("enabled", True) - if enabled == "false" or enabled is False: - logger.warning( - f"Skipping pipeline {pipeline_config_dict.get('name')} as it is not enabled" - ) - continue + pipeline_config_dict = load_config_file(pipeline_config_file) + enabled = pipeline_config_dict.get("enabled", True) + if enabled == "false" or enabled is False: + logger.warning( + f"Skipping pipeline {pipeline_config_dict.get('name')} as it is not enabled" + ) + continue # now load the config with variable expansion pipeline_config_dict = load_config_file(pipeline_config_file) diff --git a/datahub-actions/src/datahub_actions/plugin/source/acryl/__init__.py b/datahub-actions/src/datahub_actions/plugin/source/acryl/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/datahub-actions/src/datahub_actions/plugin/source/acryl/constants.py b/datahub-actions/src/datahub_actions/plugin/source/acryl/constants.py new file mode 100644 index 00000000..5540dd52 --- /dev/null +++ b/datahub-actions/src/datahub_actions/plugin/source/acryl/constants.py @@ -0,0 +1,2 @@ +PLATFORM_EVENT_TOPIC_NAME = "PlatformEvent_v1" +ENTITY_CHANGE_EVENT_NAME = "entityChangeEvent" diff --git a/datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_event_source.py b/datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_event_source.py new file mode 100644 index 00000000..c0c732d0 --- /dev/null +++ b/datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_event_source.py @@ -0,0 +1,206 @@ +import json +import logging +import time +from dataclasses import dataclass +from typing import Iterable, List, Optional + +from datahub.configuration import ConfigModel +from datahub.emitter.serialization_helper import post_json_transform + +# DataHub imports. +from datahub.metadata.schema_classes import GenericPayloadClass + +from datahub_actions.event.event_envelope import EventEnvelope +from datahub_actions.event.event_registry import ( + ENTITY_CHANGE_EVENT_V1_TYPE, + EntityChangeEvent, +) + +# May or may not need these. +from datahub_actions.pipeline.pipeline_context import PipelineContext +from datahub_actions.plugin.source.acryl.constants import ( + ENTITY_CHANGE_EVENT_NAME, + PLATFORM_EVENT_TOPIC_NAME, +) +from datahub_actions.plugin.source.acryl.datahub_cloud_events_ack_manager import ( + AckManager, +) +from datahub_actions.plugin.source.acryl.datahub_cloud_events_consumer import ( + DataHubEventsConsumer, + ExternalEvent, +) +from datahub_actions.source.event_source import EventSource + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +# Converts a DataHub Events Message to an EntityChangeEvent. +def build_entity_change_event(payload: GenericPayloadClass) -> EntityChangeEvent: + try: + return EntityChangeEvent.from_json(payload.get("value")) + except Exception as e: + raise ValueError("Failed to parse into EntityChangeEvent") from e + + +class DataHubEventsSourceConfig(ConfigModel): + topic: str = PLATFORM_EVENT_TOPIC_NAME + consumer_id: Optional[str] # Used to store offset for the consumer. + lookback_days: Optional[int] = None + reset_offsets: Optional[bool] = False + + # Time and Exit Conditions. + kill_after_idle_timeout: bool = False + idle_timeout_duration_seconds: int = 30 + event_processing_time_max_duration_seconds: int = 60 + + +# This is the custom DataHub-based Event Source. +@dataclass +class DataHubEventSource(EventSource): + running = False + source_config: DataHubEventsSourceConfig + ctx: PipelineContext + + @staticmethod + def _get_pipeline_urn(pipeline_name: str) -> str: + if pipeline_name.startswith("urn:li:dataHubAction:"): + return pipeline_name + else: + return f"urn:li:dataHubAction:{pipeline_name}" + + def __init__(self, config: DataHubEventsSourceConfig, ctx: PipelineContext): + self.ctx = ctx + self.source_config = config + self.consumer_id = DataHubEventSource._get_pipeline_urn(self.ctx.pipeline_name) + + # Ensure a Graph Instance was provided. + assert self.ctx.graph is not None + + self.datahub_events_consumer: DataHubEventsConsumer = DataHubEventsConsumer( + # TODO: This PipelineContext provides an Acryl Graph Instance + graph=self.ctx.graph.graph, + consumer_id=self.consumer_id, + lookback_days=self.source_config.lookback_days, + reset_offsets=self.source_config.reset_offsets, + ) + self.ack_manager = AckManager() + self.safe_to_ack_offset: Optional[str] = None + + @classmethod + def create(cls, config_dict: dict, ctx: PipelineContext) -> "EventSource": + config = DataHubEventsSourceConfig.parse_obj(config_dict) + return cls(config, ctx) + + def events(self) -> Iterable[EventEnvelope]: + logger.info("Starting DataHub Cloud events source...") + logger.info(f"Subscribing to the following topic: {self.source_config.topic}") + self.running = True + yield from self._poll_and_process_events() + + def _poll_and_process_events(self) -> Iterable[EventEnvelope]: + """Poll and process events in the main loop.""" + last_idle_response_timestamp = 0 + while self.running: + try: + sleeps_to_go = ( + self.source_config.event_processing_time_max_duration_seconds + ) + + while self.ack_manager.outstanding_acks(): + time.sleep(1) + sleeps_to_go -= 1 + logger.debug(f"Sleeps to go: {sleeps_to_go}") + + if sleeps_to_go == 0: + self.running = False + raise Exception( + f"Failed to process all events successfully after specified time {self.source_config.event_processing_time_max_duration_seconds}! If more time is required, please increase the timeout using this config. {self.ack_manager.acks.values()}", + ) + logger.debug( + f"Successfully processed events up to offset id {self.safe_to_ack_offset}" + ) + self.safe_to_ack_offset = self.datahub_events_consumer.offset_id + logger.debug(f"Safe to ack offset: {self.safe_to_ack_offset}") + + events_response = self.datahub_events_consumer.poll_events( + topic=self.source_config.topic, poll_timeout_seconds=2 + ) + + # Handle Idle Timeout + num_events = len(events_response.events) + + if num_events == 0: + if last_idle_response_timestamp == 0: + last_idle_response_timestamp = ( + self._get_current_timestamp_seconds() + ) + if self._should_idle_timeout( + num_events, last_idle_response_timestamp + ): + logger.info("Exiting main loop due to idle timeout") + return + else: + self.ack_manager.new_batch() + last_idle_response_timestamp = 0 # Reset the idle timeout + + event_envelopes: List[EventEnvelope] = [] + for msg in events_response.events: + for event_envelope in self.handle_pe(msg): + event_envelope.meta = self.ack_manager.get_meta(event_envelope) + event_envelopes.append(event_envelope) + + yield from event_envelopes + + except Exception as e: + logger.exception(f"DataHub Events consumer error: {e}") + self.running = False + + logger.info("DataHub Events consumer exiting main loop") + + @staticmethod + def handle_pe(msg: ExternalEvent) -> Iterable[EventEnvelope]: + value: dict = json.loads(msg.value) + payload: GenericPayloadClass = GenericPayloadClass.from_obj( + post_json_transform(value["payload"]) + ) + if ENTITY_CHANGE_EVENT_NAME == value["name"]: + event = build_entity_change_event(payload) + yield EventEnvelope(ENTITY_CHANGE_EVENT_V1_TYPE, event, {}) + + def close(self) -> None: + if self.datahub_events_consumer: + self.running = False + if self.safe_to_ack_offset: + self.datahub_events_consumer.commit_offsets( + offset_id=self.safe_to_ack_offset + ) + self.datahub_events_consumer.close() + + def ack(self, event: EventEnvelope, processed: bool = True) -> None: + self.ack_manager.ack(event.meta, processed=processed) + logger.debug(f"Actions acked event {event} as processed {processed}") + + def _should_idle_timeout( + self, num_events: int, last_idle_response_timestamp: int + ) -> bool: + """Handle idle timeout logic and decide if the loop should exit.""" + if num_events > 0: + return False # Continue processing + + current_timestamp_seconds = self._get_current_timestamp_seconds() + + if ( + self.source_config.kill_after_idle_timeout + and current_timestamp_seconds - last_idle_response_timestamp + > self.source_config.idle_timeout_duration_seconds + ): + logger.info( + f"Shutting down due to idle timeout of {self.source_config.idle_timeout_duration_seconds} seconds" + ) + self.running = False + return True # Signal that we should exit + return False # Continue processing + + def _get_current_timestamp_seconds(self) -> int: + return int(time.time()) diff --git a/datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_events_ack_manager.py b/datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_events_ack_manager.py new file mode 100644 index 00000000..5acb21a4 --- /dev/null +++ b/datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_events_ack_manager.py @@ -0,0 +1,35 @@ +import logging +from typing import Any, Dict, Tuple + +logger = logging.getLogger(__name__) + + +class AckManager: + """ + An internal Ack manager which hands out fake message ids but ensures we + track all acks and processing + """ + + def __init__(self) -> None: + self.batch_id = 0 + self.msg_id = 0 + self.acks: Dict[Tuple[int, int], bool] = {} + + def new_batch(self) -> None: + self.batch_id += 1 + self.msg_id = 0 + + def get_meta(self, event: Any) -> Dict[str, Any]: + self.msg_id += 1 + self.acks[(self.batch_id, self.msg_id)] = event + return {"batch_id": self.batch_id, "msg_id": self.msg_id} + + def ack(self, meta: dict, processed: bool) -> None: + batch_id, msg_id = (meta["batch_id"], meta["msg_id"]) + if processed: + self.acks.pop((batch_id, msg_id)) + else: + logger.warning(f"Whoops - we didn't process {meta}") + + def outstanding_acks(self) -> int: + return len(self.acks) diff --git a/datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_events_consumer.py b/datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_events_consumer.py new file mode 100644 index 00000000..3ae6cd4b --- /dev/null +++ b/datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_events_consumer.py @@ -0,0 +1,172 @@ +import logging +import time +from typing import List, Optional + +import requests +from datahub.ingestion.graph.client import DataHubGraph, get_default_graph +from pydantic import BaseModel, Field +from requests.exceptions import ConnectionError, HTTPError +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential, +) + +from datahub_actions.plugin.source.acryl.datahub_cloud_events_consumer_offsets_store import ( + DataHubEventsConsumerPlatformResourceOffsetsStore, +) + +logger = logging.getLogger(__name__) + + +class ExternalEvent(BaseModel): + contentType: str = Field(..., description="The encoding type of the event") + value: str = Field(..., description="The raw serialized event itself") + + +class ExternalEventsResponse(BaseModel): + offsetId: str = Field(..., description="Offset id the stream for scrolling") + count: int = Field(..., description="Count of the events") + events: List[ExternalEvent] = Field(..., description="The raw events") + + +class DataHubEventsConsumer: + """ + A simple consumer of DataHub Events API. + Note that this class is not thread safe. + """ + + def __init__( + self, + graph: DataHubGraph, + consumer_id: Optional[str] = None, + offset_id: Optional[str] = None, + lookback_days: Optional[int] = None, + reset_offsets: Optional[bool] = False, + ): + # 1) Always set self.consumer_id, even if None, so tests can assert it safely. + self.consumer_id: Optional[str] = consumer_id + + # Base properties + self.graph: DataHubGraph = graph + self.offset_id: Optional[str] = offset_id + self.default_lookback_days: Optional[int] = lookback_days + self.offsets_store: Optional[ + DataHubEventsConsumerPlatformResourceOffsetsStore + ] = None + + # Build the base URL from the graph config + self.base_url = f"{graph.config.server}/openapi" + + # 2) Create the offsets store only if we have a consumer_id + if self.consumer_id is not None: + self.offsets_store = DataHubEventsConsumerPlatformResourceOffsetsStore( + graph=self.graph, + consumer_id=self.consumer_id, + ) + # 3) If you've chosen to reset consumer offsets, we simply do not load the previous. + # Otherwise, load the offsets. + if not reset_offsets: + loaded_offset = self.offsets_store.load_offset_id() + if loaded_offset is not None: + self.offset_id = loaded_offset + + logger.debug( + f"Starting DataHub Events Consumer with id {self.consumer_id} at offset id {self.offset_id}" + ) + else: + logger.debug("Starting DataHub Events Consumer with no consumer ID.") + + @retry( + retry=retry_if_exception_type((HTTPError, ConnectionError)), + wait=wait_exponential(multiplier=1, min=2, max=30), + stop=stop_after_attempt(3), + reraise=True, + ) + def poll_events( + self, + topic: str, + offset_id: Optional[str] = None, + limit: Optional[int] = None, + poll_timeout_seconds: Optional[int] = None, + ) -> ExternalEventsResponse: + """ + Fetch events for a specific topic. + """ + endpoint = f"{self.base_url}/v1/events/poll" + + # If the caller provided an offset_id, use it; otherwise fall back to self.offset_id. + resolved_offset_id = offset_id or self.offset_id + + params = { + "topic": topic, + "offsetId": resolved_offset_id, + "limit": limit, + "pollTimeoutSeconds": poll_timeout_seconds, + "lookbackWindowDays": self.default_lookback_days, + } + # Remove keys where the value is None + params = {k: v for k, v in params.items() if v is not None} + + # Pass along the session headers from the graph for authentication. + headers = dict(self.graph._session.headers) + + response = requests.get(endpoint, params=params, headers=headers) + response.raise_for_status() + + external_events_response = ExternalEventsResponse.parse_obj(response.json()) + + # Update our internal offset_id to the newly returned offset + self.offset_id = external_events_response.offsetId + return external_events_response + + def get_events(self, response: ExternalEventsResponse) -> List[ExternalEvent]: + """ + Extract events from the response data. + """ + return response.events + + def commit_offsets(self, offset_id: Optional[str] = None) -> None: + """ + Commit the current offset id or a passed-in offset_id. + """ + if self.offsets_store is not None: + store_offset_id = offset_id or self.offset_id + if store_offset_id is not None: + self.offsets_store.store_offset_id(store_offset_id) + + def close(self) -> None: + """ + Optional cleanup (currently no-op). + """ + pass + + +if __name__ == "__main__": + with get_default_graph() as graph: + client = DataHubEventsConsumer(graph=graph, consumer_id="events_consumer_cli") + if client.offset_id is not None: + print(f"Starting offset id: {client.offset_id}") + + while True: + + # Example: Poll events for the PlatformEvent_v1 topic + response = client.poll_events( + topic="PlatformEvent_v1", limit=10, poll_timeout_seconds=5 + ) + + print(f"Offset ID: {response.offsetId}") + print(f"Event count: {response.count}") + + events = client.get_events(response) + if len(events) == 0: + print("No events to process.") + else: + for event in events: + print(f"Content Type: {event.contentType}") + print(f"Value: {event.value}") + print("---") + client.commit_offsets() + + time.sleep(1) diff --git a/datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_events_consumer_offsets_store.py b/datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_events_consumer_offsets_store.py new file mode 100644 index 00000000..06ad2163 --- /dev/null +++ b/datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_events_consumer_offsets_store.py @@ -0,0 +1,105 @@ +import logging +import time +from typing import Optional + +from datahub.emitter.mce_builder import datahub_guid, make_data_platform_urn +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.graph.client import DataHubGraph +from datahub.metadata.schema_classes import ( + PlatformResourceInfoClass, + SerializedValueClass, + SerializedValueContentTypeClass, + SerializedValueSchemaTypeClass, +) +from pydantic import BaseModel + +logger = logging.getLogger(__name__) + + +class EventConsumerState(BaseModel): + VERSION = 1 # Increment this version when the schema of EventConsumerState changes + offset_id: Optional[str] = None + timestamp: Optional[int] = None + + def to_serialized_value(self) -> SerializedValueClass: + return SerializedValueClass( + blob=self.to_blob(), + contentType=SerializedValueContentTypeClass.JSON, + schemaType=SerializedValueSchemaTypeClass.JSON, + schemaRef=f"EventConsumerState@{self.VERSION}", + ) + + @classmethod + def from_serialized_value(cls, value: SerializedValueClass) -> "EventConsumerState": + try: + assert value.contentType == SerializedValueContentTypeClass.JSON + assert value.schemaType == SerializedValueSchemaTypeClass.JSON + if value.schemaRef is not None: + assert value.schemaRef.split("@")[0] == "EventConsumerState" + return cls.from_blob(value.blob) + except Exception as e: + logger.error(f"Failed to deserialize EventConsumerState: {e}") + return cls() + + def to_blob(self) -> bytes: + return self.json().encode() + + @staticmethod + def from_blob(blob: bytes) -> "EventConsumerState": + return EventConsumerState.parse_raw(blob.decode()) + + +class DataHubEventsConsumerPlatformResourceOffsetsStore: + """ + State store for the Event Source. + This loads the offset id for a given consumer id string. + """ + + def __init__( + self, + graph: DataHubGraph, + consumer_id: str, + state_resource_name: str = "state", + ): + self.graph = graph + self.consumer_id = consumer_id + self.state_resource_name = state_resource_name + resource_guid = datahub_guid( + { + "platform": make_data_platform_urn("datahub"), + "consumer_id": self.consumer_id, + "state_resource_name": self.state_resource_name, + } + ) + self.state_resource_urn = f"urn:li:platformResource:{resource_guid}" + + def store_offset_id(self, offset_id: str) -> str: + state = EventConsumerState(offset_id=offset_id, timestamp=int(time.time())) + resource_info: PlatformResourceInfoClass = PlatformResourceInfoClass( + resourceType="eventConsumerState", # Resource type is eventConsumerState + primaryKey=self.consumer_id, # Primary key is the consumer id + value=state.to_serialized_value(), # Value is the serialized state + ) + + # Emit state to DataHub using MetadataChangeProposalWrapper + mcp = MetadataChangeProposalWrapper( + entityUrn=self.state_resource_urn, + aspect=resource_info, + ) + + # Write to graph + self.graph.emit(mcp, async_flag=False) + logger.info(f"Stored offset id {offset_id} for consumer id {self.consumer_id}") + return offset_id + + def load_offset_id(self) -> Optional[str]: + # Read from graph + existing_state = self.graph.get_aspect( + self.state_resource_urn, PlatformResourceInfoClass + ) + if existing_state is not None and existing_state.value is not None: + # We have a state aspect + state = EventConsumerState.from_serialized_value(existing_state.value) + return state.offset_id + + return None diff --git a/datahub-actions/src/datahub_actions/source/event_source_registry.py b/datahub-actions/src/datahub_actions/source/event_source_registry.py index 2b505ae9..38c838a6 100644 --- a/datahub-actions/src/datahub_actions/source/event_source_registry.py +++ b/datahub-actions/src/datahub_actions/source/event_source_registry.py @@ -14,9 +14,13 @@ from datahub.ingestion.api.registry import PluginRegistry +from datahub_actions.plugin.source.acryl.datahub_cloud_event_source import ( + DataHubEventSource, +) from datahub_actions.plugin.source.kafka.kafka_event_source import KafkaEventSource from datahub_actions.source.event_source import EventSource event_source_registry = PluginRegistry[EventSource]() event_source_registry.register_from_entrypoint("datahub_actions.source.plugins") event_source_registry.register("kafka", KafkaEventSource) +event_source_registry.register("datahub-cloud", DataHubEventSource) diff --git a/datahub-actions/tests/unit/plugin/source/acryl/__init__.py b/datahub-actions/tests/unit/plugin/source/acryl/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_event_source.py b/datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_event_source.py new file mode 100644 index 00000000..6a8d13ae --- /dev/null +++ b/datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_event_source.py @@ -0,0 +1,290 @@ +# test_datahub_event_source.py + +from typing import List, cast +from unittest.mock import MagicMock, patch + +import pytest + +from datahub_actions.event.event_envelope import EventEnvelope +from datahub_actions.event.event_registry import ( + ENTITY_CHANGE_EVENT_V1_TYPE, + EntityChangeEvent, +) +from datahub_actions.pipeline.pipeline_context import PipelineContext + +# Import your source + config classes from the correct module path. +from datahub_actions.plugin.source.acryl.datahub_cloud_event_source import ( + DataHubEventSource, + DataHubEventsSourceConfig, +) +from datahub_actions.plugin.source.acryl.datahub_cloud_events_ack_manager import ( + AckManager, +) +from datahub_actions.plugin.source.acryl.datahub_cloud_events_consumer import ( + DataHubEventsConsumer, + ExternalEvent, + ExternalEventsResponse, +) + + +@pytest.fixture +def mock_pipeline_context() -> PipelineContext: + """ + Create a mock PipelineContext with the attributes needed for DataHubEventSource. + """ + mock_ctx = MagicMock(spec=PipelineContext) + # Add pipeline_name so we don't get AttributeError: + mock_ctx.pipeline_name = "test-pipeline" + + # We also assume mock_ctx.graph has a .graph attribute with a mock DataHubGraph. + mock_ctx.graph = MagicMock() + mock_ctx.graph.graph = MagicMock() # The underlying DataHubGraph + + return cast(PipelineContext, mock_ctx) + + +@pytest.fixture +def base_config_dict() -> dict: + """ + Base config dict that can be updated for specific tests. + We will parse this into DataHubEventsSourceConfig in each test. + """ + return { + "topic": "PlatformEvent_v1", + "lookback_days": None, + "reset_offsets": False, + "kill_after_idle_timeout": True, + "idle_timeout_duration_seconds": 5, + "event_processing_time_max_duration_seconds": 5, + } + + +def test_create_source( + mock_pipeline_context: PipelineContext, base_config_dict: dict +) -> None: + """ + Validate that DataHubEventSource.create() properly instantiates the source. + """ + source = DataHubEventSource.create(base_config_dict, mock_pipeline_context) + assert isinstance(source, DataHubEventSource) + # The consumer_id on the instance includes the action prefix from pipeline_name + assert source.consumer_id == "urn:li:dataHubAction:test-pipeline" + + +def test_get_pipeline_urn() -> None: + """ + Validate that _get_pipeline_urn() handles pipeline_name with or without the prefix. + """ + urn = DataHubEventSource._get_pipeline_urn("some-pipeline") + assert urn == "urn:li:dataHubAction:some-pipeline" + + urn2 = DataHubEventSource._get_pipeline_urn("urn:li:dataHubAction:already-a-urn") + assert urn2 == "urn:li:dataHubAction:already-a-urn" + + +def test_source_initialization( + mock_pipeline_context: PipelineContext, base_config_dict: dict +) -> None: + """ + Validate that DataHubEventSource constructor sets up DataHubEventsConsumer and AckManager. + """ + config_model = DataHubEventsSourceConfig.parse_obj(base_config_dict) + source = DataHubEventSource(config_model, mock_pipeline_context) + assert source.consumer_id == "urn:li:dataHubAction:test-pipeline" + assert isinstance(source.datahub_events_consumer, DataHubEventsConsumer) + assert isinstance(source.ack_manager, AckManager) + assert source.safe_to_ack_offset is None + + +def test_events_with_no_events( + mock_pipeline_context: PipelineContext, base_config_dict: dict +) -> None: + base_config_dict["idle_timeout_duration_seconds"] = 1 + base_config_dict["kill_after_idle_timeout"] = True + config_model = DataHubEventsSourceConfig.parse_obj(base_config_dict) + source = DataHubEventSource(config_model, mock_pipeline_context) + + mock_consumer = MagicMock(spec=DataHubEventsConsumer) + mock_consumer.offset_id = "offset-100" # Set the mocked offset_id + source.datahub_events_consumer = mock_consumer + + # We'll simulate that poll_events returns a response with 0 events repeatedly. + empty_response = ExternalEventsResponse(offsetId="offset-100", count=0, events=[]) + mock_consumer.poll_events.return_value = empty_response + + with patch.object( + source, "_get_current_timestamp_seconds", side_effect=[100, 101, 102, 103] + ): + events_iter = source.events() + emitted_events = list(events_iter) # Convert generator to list + + assert len(emitted_events) == 0 + + assert mock_consumer.poll_events.call_count >= 1 + assert source.running is False + + +def test_events_with_some_events( + mock_pipeline_context: PipelineContext, base_config_dict: dict +) -> None: + """ + If poll_events returns events, verify that the source yields them and resets idle timer. + """ + config_model = DataHubEventsSourceConfig.parse_obj(base_config_dict) + source = DataHubEventSource(config_model, mock_pipeline_context) + + mock_consumer = MagicMock(spec=DataHubEventsConsumer) + mock_consumer.offset_id = "offset-100" + source.datahub_events_consumer = mock_consumer + mock_ack_manager = MagicMock(spec=AckManager) + mock_ack_manager.outstanding_acks.side_effect = [0] + + source.ack_manager = mock_ack_manager + + # Simulate the consumer returning a batch of events + event_value = '{"header":{"timestampMillis":1737170481713},"name":"entityChangeEvent","payload":{"value":"{\\"auditStamp\\":{\\"actor\\":\\"urn:li:corpuser:john.joyce@acryl.io\\",\\"time\\":1737170481713},\\"entityUrn\\":\\"urn:li:dataset:(urn:li:dataPlatform:snowflake,datahub_community.datahub_slack.message_file,PROD)\\",\\"entityType\\":\\"dataset\\",\\"modifier\\":\\"urn:li:tag:COLUMNFIELD\\",\\"category\\":\\"TAG\\",\\"operation\\":\\"ADD\\",\\"version\\":0,\\"parameters\\":{\\"tagUrn\\":\\"urn:li:tag:COLUMNFIELD\\"}}","contentType":"application/json"}}' + fake_event = ExternalEvent(contentType="application/json", value=event_value) + poll_response = ExternalEventsResponse( + offsetId="offset-101", count=1, events=[fake_event] + ) + mock_consumer.poll_events.return_value = poll_response + + # Add a side effect to exit the loop + def _side_effect(*args, **kwargs): + source.running = False + return poll_response + + mock_consumer.poll_events.side_effect = _side_effect + + # Patch _get_current_timestamp_seconds + with patch.object(source, "_get_current_timestamp_seconds", return_value=100): + emitted = list(source.events()) + + # Assertions + assert len(emitted) == 1 + assert emitted[0].event_type == ENTITY_CHANGE_EVENT_V1_TYPE + assert isinstance(emitted[0].event, EntityChangeEvent) + mock_ack_manager.get_meta.assert_called_once() + assert source.safe_to_ack_offset == "offset-100" # Previous offset. + assert mock_consumer.poll_events.call_count == 1 + + +def test_outstanding_acks_timeout( + mock_pipeline_context: PipelineContext, base_config_dict: dict +) -> None: + """ + If ack_manager.outstanding_acks() never returns 0, we eventually raise an exception + due to event_processing_time_max_duration_seconds. + """ + base_config_dict["event_processing_time_max_duration_seconds"] = 2 + config_model = DataHubEventsSourceConfig.parse_obj(base_config_dict) + source = DataHubEventSource(config_model, mock_pipeline_context) + + mock_ack_manager = MagicMock(spec=AckManager) + + mock_ack_manager.outstanding_acks.return_value = 1 # always 1 + mock_ack_manager.acks = {"values": []} + source.ack_manager = mock_ack_manager + + mock_consumer = MagicMock(spec=DataHubEventsConsumer) + mock_consumer.offset_id = "offset-100" + source.datahub_events_consumer = mock_consumer + + source.running = True + + # Ensure the call times out. + list(source.events()) + + assert source.running is False + + +def test_handle_pe() -> None: + """ + Verify that handle_pe yields an EntityChangeEvent if the 'name' is 'entityChangeEvent', + otherwise yields nothing. + """ + # Valid "entityChangeEvent" object + event_value = '{"header":{"timestampMillis":1737170481713},"name":"entityChangeEvent","payload":{"value":"{\\"auditStamp\\":{\\"actor\\":\\"urn:li:corpuser:john.joyce@acryl.io\\",\\"time\\":1737170481713},\\"entityUrn\\":\\"urn:li:dataset:(urn:li:dataPlatform:snowflake,datahub_community.datahub_slack.message_file,PROD)\\",\\"entityType\\":\\"dataset\\",\\"modifier\\":\\"urn:li:tag:COLUMNFIELD\\",\\"category\\":\\"TAG\\",\\"operation\\":\\"ADD\\",\\"version\\":0,\\"parameters\\":{\\"tagUrn\\":\\"urn:li:tag:COLUMNFIELD\\"}}","contentType":"application/json"}}' + msg = ExternalEvent(contentType="application/json", value=event_value) + + envelopes: List[EventEnvelope] = list(DataHubEventSource.handle_pe(msg)) + assert len(envelopes) == 1 + assert envelopes[0].event_type == ENTITY_CHANGE_EVENT_V1_TYPE + assert isinstance(envelopes[0].event, EntityChangeEvent) + + # Different event name => no yield + event_value_2 = '{"header":{"timestampMillis":1737170481713},"name":"anotherEvent","payload":{"value":"{\\"auditStamp\\":{\\"actor\\":\\"urn:li:corpuser:john.joyce@acryl.io\\",\\"time\\":1737170481713},\\"entityUrn\\":\\"urn:li:dataset:(urn:li:dataPlatform:snowflake,datahub_community.datahub_slack.message_file,PROD)\\",\\"entityType\\":\\"dataset\\",\\"modifier\\":\\"urn:li:tag:COLUMNFIELD\\",\\"category\\":\\"TAG\\",\\"operation\\":\\"ADD\\",\\"version\\":0,\\"parameters\\":{\\"tagUrn\\":\\"urn:li:tag:COLUMNFIELD\\"}}","contentType":"application/json"}}' + msg2 = ExternalEvent(contentType="application/json", value=event_value_2) + envelopes2 = list(DataHubEventSource.handle_pe(msg2)) + assert len(envelopes2) == 0 + + +def test_ack(mock_pipeline_context: PipelineContext, base_config_dict: dict) -> None: + """ + Verify that ack() calls ack_manager.ack with the event's metadata. + """ + config_model = DataHubEventsSourceConfig.parse_obj(base_config_dict) + source = DataHubEventSource(config_model, mock_pipeline_context) + + mock_ack_manager = MagicMock(spec=AckManager) + source.ack_manager = mock_ack_manager + + envelope = EventEnvelope( + event_type=ENTITY_CHANGE_EVENT_V1_TYPE, + event=MagicMock(spec=EntityChangeEvent), + meta={"batch_id": 1, "msg_id": 2}, + ) + source.ack(envelope, processed=True) + mock_ack_manager.ack.assert_called_once_with(envelope.meta, processed=True) + + +def test_close(mock_pipeline_context: PipelineContext, base_config_dict: dict) -> None: + """ + Verify that close() stops the source, commits offsets, and calls consumer.close(). + """ + config_model = DataHubEventsSourceConfig.parse_obj(base_config_dict) + source = DataHubEventSource(config_model, mock_pipeline_context) + + mock_consumer = MagicMock(spec=DataHubEventsConsumer) + source.datahub_events_consumer = mock_consumer + + source.safe_to_ack_offset = "some-offset-id" + + source.close() + assert source.running is False + mock_consumer.commit_offsets.assert_called_once_with(offset_id="some-offset-id") + mock_consumer.close.assert_called_once() + + +def test_should_idle_timeout( + mock_pipeline_context: PipelineContext, base_config_dict: dict +) -> None: + """ + Verify the idle timeout logic in _should_idle_timeout(). + """ + base_config_dict["idle_timeout_duration_seconds"] = 5 + config_model = DataHubEventsSourceConfig.parse_obj(base_config_dict) + source = DataHubEventSource(config_model, mock_pipeline_context) + + # If events > 0 => always False + assert ( + source._should_idle_timeout(num_events=2, last_idle_response_timestamp=100) + is False + ) + + # If time difference < idle_timeout_duration_seconds => False + with patch.object(source, "_get_current_timestamp_seconds", return_value=104): + # 4 seconds from 100 => not timed out + assert ( + source._should_idle_timeout(num_events=0, last_idle_response_timestamp=100) + is False + ) + + # If time difference > idle_timeout_duration_seconds => returns True + sets running=False + with patch.object(source, "_get_current_timestamp_seconds", return_value=106): + # 6 seconds from 100 => timed out + assert ( + source._should_idle_timeout(num_events=0, last_idle_response_timestamp=100) + is True + ) + assert source.running is False diff --git a/datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_events_ack_manager.py b/datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_events_ack_manager.py new file mode 100644 index 00000000..88c073ba --- /dev/null +++ b/datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_events_ack_manager.py @@ -0,0 +1,67 @@ +# test_ack_manager.py + +from typing import Any, Dict + +from datahub_actions.plugin.source.acryl.datahub_cloud_events_ack_manager import ( + AckManager, +) + + +def test_new_batch() -> None: + ack_manager = AckManager() + assert ack_manager.batch_id == 0 + assert ack_manager.msg_id == 0 + assert ack_manager.outstanding_acks() == 0 + + ack_manager.new_batch() + assert ack_manager.batch_id == 1 + assert ack_manager.msg_id == 0 + assert ack_manager.outstanding_acks() == 0 + + +def test_get_meta() -> None: + ack_manager = AckManager() + ack_manager.new_batch() + + meta: Dict[str, Any] = ack_manager.get_meta("some event") + assert meta["batch_id"] == 1 + assert meta["msg_id"] == 1 + assert ack_manager.outstanding_acks() == 1 + + +def test_ack_processed() -> None: + ack_manager = AckManager() + ack_manager.new_batch() + + meta: Dict[str, Any] = ack_manager.get_meta("some event") + assert ack_manager.outstanding_acks() == 1 + + ack_manager.ack(meta, processed=True) + assert ack_manager.outstanding_acks() == 0 + + +def test_ack_not_processed() -> None: + ack_manager = AckManager() + ack_manager.new_batch() + + meta: Dict[str, Any] = ack_manager.get_meta("some event") + assert ack_manager.outstanding_acks() == 1 + + ack_manager.ack(meta, processed=False) + # The ack should remain since processed=False + assert ack_manager.outstanding_acks() == 1 + + +def test_multiple_acks() -> None: + ack_manager = AckManager() + ack_manager.new_batch() + + meta1: Dict[str, Any] = ack_manager.get_meta("event1") + meta2: Dict[str, Any] = ack_manager.get_meta("event2") + assert ack_manager.outstanding_acks() == 2 + + ack_manager.ack(meta1, processed=True) + assert ack_manager.outstanding_acks() == 1 + + ack_manager.ack(meta2, processed=True) + assert ack_manager.outstanding_acks() == 0 diff --git a/datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_events_consumer.py b/datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_events_consumer.py new file mode 100644 index 00000000..3600acf2 --- /dev/null +++ b/datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_events_consumer.py @@ -0,0 +1,280 @@ +# test_acryl_datahub_events_consumer.py + +from typing import List, Optional, cast +from unittest.mock import MagicMock, patch + +import pytest +from datahub.ingestion.graph.client import DataHubGraph +from requests.exceptions import ConnectionError, HTTPError +from requests.models import Response + +from datahub_actions.plugin.source.acryl.datahub_cloud_events_consumer import ( + DataHubEventsConsumer, + ExternalEvent, + ExternalEventsResponse, +) +from datahub_actions.plugin.source.acryl.datahub_cloud_events_consumer_offsets_store import ( + DataHubEventsConsumerPlatformResourceOffsetsStore, +) + + +@pytest.fixture +def mock_graph() -> DataHubGraph: + """ + Provide a mock DataHubGraph instance, including a mock config and _session. + This prevents 'AttributeError: Mock object has no attribute "config"'. + """ + mock_graph = MagicMock(spec=DataHubGraph) + + # Mock config object with a server attribute + mock_config = MagicMock() + mock_config.server = "http://fake-datahub" + mock_graph.config = mock_config + + # Mock _session with headers + mock_session = MagicMock() + mock_session.headers = {"Authorization": "Bearer test-token"} + mock_graph._session = mock_session + + return cast(DataHubGraph, mock_graph) + + +@pytest.fixture +def external_events_response() -> ExternalEventsResponse: + """ + Provide a sample ExternalEventsResponse for use in tests. + """ + return ExternalEventsResponse( + offsetId="next-offset-123", + count=2, + events=[ + ExternalEvent(contentType="application/json", value='{"key": "value"}'), + ExternalEvent(contentType="application/json", value='{"another": "event"}'), + ], + ) + + +def test_consumer_init_with_consumer_id_loads_offsets(mock_graph: DataHubGraph) -> None: + """ + Verify that providing a consumer_id triggers loading existing offsets + from the store, unless reset_offsets is True. + """ + mock_store = MagicMock(spec=DataHubEventsConsumerPlatformResourceOffsetsStore) + mock_store.load_offset_id.return_value = "loaded-offset" + + # Patch the store's constructor so it returns our mock_store + with patch.object( + target=DataHubEventsConsumerPlatformResourceOffsetsStore, + attribute="__init__", + return_value=None, + ), patch.object( + DataHubEventsConsumerPlatformResourceOffsetsStore, + "load_offset_id", + new=mock_store.load_offset_id, + ): + # Construct the consumer + consumer = DataHubEventsConsumer( + graph=mock_graph, consumer_id="test-consumer", reset_offsets=False + ) + # Because we've mocked __init__, manually set consumer.offsets_store + consumer.offsets_store = mock_store + + assert consumer.consumer_id == "test-consumer" + # load_offset_id was called + mock_store.load_offset_id.assert_called_once() + # consumer.offset_id should come from the store + assert consumer.offset_id == "loaded-offset" + + +def test_consumer_init_with_consumer_id_and_reset_offsets( + mock_graph: DataHubGraph, +) -> None: + """ + Verify that when reset_offsets=True, we do NOT load the offset + from the store. + """ + mock_store = MagicMock(spec=DataHubEventsConsumerPlatformResourceOffsetsStore) + + with patch.object( + target=DataHubEventsConsumerPlatformResourceOffsetsStore, + attribute="__init__", + return_value=None, + ): + consumer = DataHubEventsConsumer( + graph=mock_graph, consumer_id="test-consumer", reset_offsets=True + ) + consumer.offsets_store = mock_store + + # load_offset_id should NOT be called if reset_offsets=True + mock_store.load_offset_id.assert_not_called() + assert consumer.offset_id is None + + +def test_consumer_init_without_consumer_id(mock_graph: DataHubGraph) -> None: + """ + Verify that if no consumer_id is provided, no offsets store is created. + """ + consumer = DataHubEventsConsumer(graph=mock_graph, consumer_id=None) + assert consumer.offsets_store is None + assert consumer.consumer_id is None + + +@pytest.mark.parametrize("given_offset_id", [None, "custom-offset-789"]) +def test_poll_events_success( + mock_graph: DataHubGraph, + external_events_response: ExternalEventsResponse, + given_offset_id: Optional[str], +) -> None: + """ + Test that poll_events calls the correct endpoint, updates offset_id, + and returns an ExternalEventsResponse. + """ + consumer = DataHubEventsConsumer( + graph=mock_graph, + consumer_id="test-consumer", + offset_id="initial-offset-456", + ) + + with patch("requests.get") as mock_get: + mock_response = MagicMock() + # Simulate JSON decoding + mock_response.json.return_value = { + "offsetId": external_events_response.offsetId, + "count": external_events_response.count, + "events": [ + {"contentType": evt.contentType, "value": evt.value} + for evt in external_events_response.events + ], + } + mock_response.raise_for_status.return_value = None + mock_get.return_value = mock_response + + polled_response = consumer.poll_events( + topic="TestTopic", + offset_id=given_offset_id, + limit=10, + poll_timeout_seconds=5, + ) + + # Verify the request params + expected_params = { + "topic": "TestTopic", + "offsetId": given_offset_id or "initial-offset-456", + "limit": 10, + "pollTimeoutSeconds": 5, + } + + # Check that requests.get was called once + mock_get.assert_called_once() + call_args, call_kwargs = mock_get.call_args + assert call_args[0] == f"{consumer.base_url}/v1/events/poll" + assert call_kwargs["params"] == expected_params + + # Verify returned response + assert polled_response.offsetId == external_events_response.offsetId + assert polled_response.count == external_events_response.count + assert len(polled_response.events) == len(external_events_response.events) + + # The consumer's offset_id should be updated + assert consumer.offset_id == external_events_response.offsetId + + +def test_poll_events_http_error(mock_graph: DataHubGraph) -> None: + """ + Test that poll_events retries and raises an HTTPError if requests.get fails. + """ + consumer = DataHubEventsConsumer( + graph=mock_graph, + consumer_id="test-consumer", + offset_id="initial-offset", + ) + dummy_response = Response() # Create a dummy Response object + with patch( + "requests.get", side_effect=HTTPError(response=dummy_response) + ) as mock_get: + # The default Tenacity stop_after_attempt=3 + with pytest.raises(HTTPError): + consumer.poll_events(topic="TestTopic") + + # requests.get should be called multiple times due to retry + assert mock_get.call_count == 3 + + +def test_poll_events_connection_error(mock_graph: DataHubGraph) -> None: + """ + Test that poll_events retries and raises a ConnectionError if requests.get fails. + """ + consumer = DataHubEventsConsumer( + graph=mock_graph, + consumer_id="test-consumer", + offset_id="initial-offset", + ) + + with patch( + "requests.get", side_effect=ConnectionError("Connection Error") + ) as mock_get: + with pytest.raises(ConnectionError): + consumer.poll_events(topic="TestTopic") + + # requests.get should be called multiple times due to retry + assert mock_get.call_count == 3 + + +def test_get_events( + mock_graph: DataHubGraph, external_events_response: ExternalEventsResponse +) -> None: + """ + Test that get_events simply returns the events list from the response. + """ + consumer = DataHubEventsConsumer(graph=mock_graph) + events: List[ExternalEvent] = consumer.get_events(external_events_response) + assert len(events) == 2 + assert all(isinstance(e, ExternalEvent) for e in events) + + +def test_commit_offsets_with_explicit_offset(mock_graph: DataHubGraph) -> None: + """ + Test commit_offsets when an explicit offset_id is passed. + """ + mock_store = MagicMock(spec=DataHubEventsConsumerPlatformResourceOffsetsStore) + consumer = DataHubEventsConsumer( + graph=mock_graph, + consumer_id="test-consumer", + offset_id=None, + ) + consumer.offsets_store = mock_store + + consumer.commit_offsets(offset_id="commit-test-offset") + mock_store.store_offset_id.assert_called_once_with("commit-test-offset") + + +def test_commit_offsets_with_consumer_offset(mock_graph: DataHubGraph) -> None: + """ + Test commit_offsets uses the consumer's current offset if none is provided. + """ + mock_store = MagicMock(spec=DataHubEventsConsumerPlatformResourceOffsetsStore) + consumer = DataHubEventsConsumer( + graph=mock_graph, + consumer_id="test-consumer", + offset_id="current-offset", + ) + consumer.offsets_store = mock_store + + consumer.commit_offsets() + mock_store.store_offset_id.assert_called_once_with("current-offset") + + +def test_commit_offsets_no_store(mock_graph: DataHubGraph) -> None: + """ + If offsets_store is None, commit_offsets should do nothing and not error. + """ + consumer = DataHubEventsConsumer( + graph=mock_graph, + consumer_id=None, + offset_id="some-offset", + ) + # offsets_store remains None + consumer.commit_offsets("ignored-offset") + # No error should occur, and no calls to the store. + # We just assert that the code doesn't raise an exception. + # (No additional assert needed here.) diff --git a/datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_events_consumer_offsets_store.py b/datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_events_consumer_offsets_store.py new file mode 100644 index 00000000..b9f149c8 --- /dev/null +++ b/datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_events_consumer_offsets_store.py @@ -0,0 +1,138 @@ +# test_offsets_store.py + +from typing import Optional, cast +from unittest.mock import MagicMock + +from datahub.ingestion.graph.client import DataHubGraph +from datahub.metadata.schema_classes import ( + PlatformResourceInfoClass, + SerializedValueClass, + SerializedValueContentTypeClass, + SerializedValueSchemaTypeClass, +) + +from datahub_actions.plugin.source.acryl.datahub_cloud_events_consumer_offsets_store import ( + DataHubEventsConsumerPlatformResourceOffsetsStore, + EventConsumerState, +) + + +def test_event_consumer_state_serialization() -> None: + """ + Tests that EventConsumerState can be serialized and deserialized properly. + """ + state = EventConsumerState(offset_id="test-offset", timestamp=1234567890) + serialized_value = state.to_serialized_value() + + assert serialized_value.contentType == SerializedValueContentTypeClass.JSON + assert serialized_value.schemaType == SerializedValueSchemaTypeClass.JSON + assert serialized_value.schemaRef is not None + assert "EventConsumerState@" in serialized_value.schemaRef + + # Round-trip: from_serialized_value + deserialized_state = EventConsumerState.from_serialized_value(serialized_value) + assert deserialized_state.offset_id == "test-offset" + assert deserialized_state.timestamp == 1234567890 + + +def test_event_consumer_state_missing_schema_handling() -> None: + """ + Tests that from_serialized_value gracefully handles missing or incorrect schemaRef. + """ + # Force an empty schemaRef + invalid_serialized_value = SerializedValueClass( + blob=b'{"offset_id":"test-offset","timestamp":987654321}', + contentType=SerializedValueContentTypeClass.JSON, + schemaType=SerializedValueSchemaTypeClass.JSON, + schemaRef=None, + ) + deserialized_state = EventConsumerState.from_serialized_value( + invalid_serialized_value + ) + assert deserialized_state.offset_id == "test-offset" + assert deserialized_state.timestamp == 987654321 + + +def test_store_offset_id() -> None: + """ + Tests that store_offset_id emits the correct MCP to DataHubGraph. + """ + mock_graph: DataHubGraph = MagicMock() + store = DataHubEventsConsumerPlatformResourceOffsetsStore( + graph=mock_graph, + consumer_id="test_consumer", + ) + + # Store an offset + offset_value: str = "my-test-offset" + returned_offset = store.store_offset_id(offset_value) + + # Check the returned value + assert returned_offset == offset_value + + # Cast the emit method to MagicMock to access call_count and call_args + emit_mock = cast(MagicMock, mock_graph.emit) + # Ensure we called `mock_graph.emit` once with an MCP + assert emit_mock.call_count == 1 + + # You can dig deeper into call args if you want to check the actual MCP content: + call_args = emit_mock.call_args + emitted_mcp = call_args[0][0] # The first argument to emit + assert emitted_mcp.entityUrn == store.state_resource_urn + # The aspect should be a PlatformResourceInfoClass + resource_info = cast(PlatformResourceInfoClass, emitted_mcp.aspect) + assert resource_info.primaryKey == "test_consumer" + assert resource_info.resourceType == "eventConsumerState" + + +def test_load_offset_id_no_existing_state() -> None: + """ + Tests that load_offset_id returns None if the aspect is not present. + """ + mock_graph: DataHubGraph = MagicMock() + # Cast get_aspect to MagicMock before setting return_value and asserting calls + get_aspect_mock = cast(MagicMock, mock_graph.get_aspect) + get_aspect_mock.return_value = None # Simulate no stored aspect + + store = DataHubEventsConsumerPlatformResourceOffsetsStore( + graph=mock_graph, + consumer_id="test_consumer", + ) + + offset: Optional[str] = store.load_offset_id() + assert offset is None + + # Ensure we called `get_aspect` once + get_aspect_mock.assert_called_once() + + +def test_load_offset_id_existing_state() -> None: + """ + Tests that load_offset_id returns the correct offset_id if the aspect is present. + """ + # Prepare a mocked, serialized EventConsumerState + state = EventConsumerState(offset_id="existing-offset", timestamp=1111111) + serialized_value = state.to_serialized_value() + + # Create a mock PlatformResourceInfoClass that includes our serialized state + mock_resource_info = PlatformResourceInfoClass( + resourceType="eventConsumerState", + primaryKey="test_consumer", + value=serialized_value, + ) + + mock_graph: DataHubGraph = MagicMock() + # Cast get_aspect to MagicMock before setting return_value and asserting calls + get_aspect_mock = cast(MagicMock, mock_graph.get_aspect) + get_aspect_mock.return_value = mock_resource_info + + store = DataHubEventsConsumerPlatformResourceOffsetsStore( + graph=mock_graph, + consumer_id="test_consumer", + ) + + offset: Optional[str] = store.load_offset_id() + assert offset == "existing-offset" + + # Ensure we called `get_aspect` once + get_aspect_mock.assert_called_once() diff --git a/docs/sources/datahub-cloud-event-source.md b/docs/sources/datahub-cloud-event-source.md new file mode 100644 index 00000000..fd2b5e67 --- /dev/null +++ b/docs/sources/datahub-cloud-event-source.md @@ -0,0 +1,113 @@ +# DataHub Cloud Event Source + +## Prerequisites + +### Compatibility + +The **DataHub Cloud Event Source** is only compatible with versions of DataHub Cloud above `v0.3.7`. + +### Privileges + +By default, users do not have access to the Events API of DataHub Cloud. In order to access the API, the user or service account +associated with the access token used to configure this events source _must_ have the `Get Platform Events` platform privilege, which +can be granted using an [Access Policy](https://datahubproject.io/docs/authorization/access-policies-guide/). + +## Overview + +The DataHub Cloud Event Source allows you to use DataHub Actions with an instance of DataHub Cloud hosted by [Acryl](https://acryl.io). + +Under the hood, the DataHub Cloud Event Source communicates with DataHub Cloud to extract change events in realtime. +The state of progress is automatically saved to DataHub Cloud after messages are processed, allowing you to seamlessly pause and restart the consumer, using the provided `name` to uniquely identify the consumer state. + +On initial startup of a new consumer id, the DataHub event source will automatically begin the _latest_ events by default. Afterwards, the message stream processed offsets will be continually saved. However, the source can also optionally be configured to "look back" in time +by a certain number of days on initial bootstrap using the `lookback_days` parameter. To reset all previously saved offsets for a consumer, +you can set `reset_offsets` to `True`. + +### Processing Guarantees + +This event source implements an "ack" function which is invoked if and only if an event is successfully processed +by the Actions framework, meaning that the event made it through the Transformers and into the Action without +any errors. Under the hood, the "ack" method synchronously commits DataHub Cloud Consumer Offsets on behalf of the Action. This means that by default, the framework provides *at-least once* processing semantics. That is, in the unusual case that a failure occurs when attempting to commit offsets back to Kafka, that event may be replayed on restart of the Action. + +If you've configured your Action pipeline `failure_mode` to be `CONTINUE` (the default), then events which +fail to be processed will simply be logged to a `failed_events.log` file for further investigation (dead letter queue). The DataHub Cloud Event Source will continue to make progress against the underlying topics and continue to commit offsets even in the case of failed messages. + +If you've configured your Action pipeline `failure_mode` to be `THROW`, then events which fail to be processed result in an Action Pipeline error. This in turn terminates the pipeline before committing offsets back to DataHub Cloud. Thus the message will not be marked as "processed" by the Action consumer. + +## Supported Events + +The DataHub Cloud Event Source produces + +- [Entity Change Event V1](../events/entity-change-event.md) + +Note that the DataHub Cloud Event Source does _not_ yet support the full [Metadata Change Log V1](../events/metadata-change-log-event.md) event stream. + +## Configure the Event Source + +Use the following config(s) to get started with the DataHub Cloud Event Source. + +### Quickstart + +To start listening for new events from now, you can use the following recipe: + +```yml +name: "unique-action-name" +datahub: + server: "https://.acryl.io" + token: "" +source: + type: "datahub-cloud" +action: + # action configs +``` + +Note that the `datahub` configuration block is **required** to connect to your DataHub Cloud instance. + +### Advanced Configurations + +To reset the offsets for the action pipeline and start consuming events from 7 days ago, you can use the following recipe: + +```yml +name: "unique-action-name" +datahub: + server: "https://.acryl.io" + token: "" +source: + type: "datahub-cloud" + config: + lookback_days: 7 # Look back 7 days for events + reset_offsets: true # Ignore stored offsets and start fresh + kill_after_idle_timeout: true # Enable shutdown after idle period + idle_timeout_duration_seconds: 60 # Idle timeout set to 60 seconds + event_processing_time_max_duration_seconds: 45 # Max processing time of 45 seconds per batch +action: + # action configs +``` + +Note that the `datahub` configuration block is **required** to connect to your DataHub Cloud instance. + +
+ View All Configuration Options + + | Field | Required | Default | Description | + | ------------------------------------- | :------: | :---------------------------: | ----------------------------------------------------------------------------------------- | + | `topic` | ❌ | `PlatformEvent_v1` | The name of the topic from which events will be consumed. Do not change this unless you know what you're doing! | + | `lookback_days` | ❌ | None | Optional number of days to look back when polling for events. | + | `reset_offsets` | ❌ | `False` | When set to `True`, the consumer will ignore any stored offsets and start fresh. | + | `kill_after_idle_timeout` | ❌ | `False` | If `True`, stops the consumer after being idle for the specified timeout duration. | + | `idle_timeout_duration_seconds` | ❌ | `30` | Duration in seconds after which, if no events are received, the consumer is considered idle. | + | `event_processing_time_max_duration_seconds` | ❌ | `30` | Maximum allowed time in seconds for processing events before timing out. | +
+ + +## FAQ + +1. Is there a way to always start processing from the end of the topics on Actions start? + +Yes, simply set `reset_offsets` to True for a single run of the action. Remember to disable this for subsequent runs if you don't want to miss any events! + +2. What happens if I have multiple actions with the same pipeline `name` running? Can I scale out horizontally? + +Today, there is undefined behavior deploying multiple actions with the same name using the DataHub Cloud Events Source. +All events must be processed by a single running action + diff --git a/examples/hello_world_datahub_cloud.yaml b/examples/hello_world_datahub_cloud.yaml new file mode 100644 index 00000000..fb35cb6b --- /dev/null +++ b/examples/hello_world_datahub_cloud.yaml @@ -0,0 +1,13 @@ +# hello_world.yaml +name: "hello_world_datahub_cloud" +# 1. DataHub Cloud Connection: Configure how to talk to DataHub Cloud +datahub: + server: "https://.acryl.io" + token: "" +# 2. Event Source: Where to source event from. +source: + type: "datahub-cloud" +# 3. Action: What action to take on events. +# To learn how to develop a custom Action, see https://datahubproject.io/docs/actions/guides/developing-an-action +action: + type: "hello_world"