Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): support pydantic v2 in file-based lineage #12723

Merged
merged 1 commit into from
Mar 2, 2025

Conversation

hsheth2
Copy link
Collaborator

@hsheth2 hsheth2 commented Feb 25, 2025

Fixes #12623

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
  • For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub

@github-actions github-actions bot added the ingestion PR or Issue related to the ingestion of metadata label Feb 25, 2025
@datahub-cyborg datahub-cyborg bot added the needs-review Label for PRs that need review from a maintainer. label Feb 25, 2025
Copy link

codecov bot commented Feb 25, 2025

❌ 5 Tests Failed:

Tests completed Failed Passed Skipped
2237 5 2232 55
View the top 3 failed test(s) by shortest run time
tests.integration.kafka-connect.test_kafka_connect::test_kafka_connect_bigquery_sink_ingest
Stack Traces | 0.001s run time
docker_compose_runner = <function docker_compose_runner.<locals>.run at 0x7f1ee7f7b8b0>
pytestconfig = <_pytest.config.Config object at 0x7f1f38a8b080>
test_resources_dir = PosixPath('.../tests/integration/kafka-connect')

    @pytest.fixture(scope="module")
    def kafka_connect_runner(docker_compose_runner, pytestconfig, test_resources_dir):
        test_resources_dir_kafka = pytestconfig.rootpath / "tests/integration/kafka"
    
        # Share Compose configurations between files and projects
        # https://docs.docker.com/compose/extends/
        docker_compose_file = [
            str(test_resources_dir_kafka / "docker-compose.yml"),
            str(test_resources_dir / "docker-compose.override.yml"),
        ]
        with docker_compose_runner(
            docker_compose_file, "kafka-connect", cleanup=False
        ) as docker_services:
            wait_for_port(
                docker_services,
                "test_mysql",
                3306,
                timeout=120,
                checker=lambda: is_mysql_up("test_mysql", 3306),
            )
    
        with docker_compose_runner(docker_compose_file, "kafka-connect") as docker_services:
            # We sometimes run into issues where the broker fails to come up on the first try because
            # of all the other processes that are running. By running docker compose twice, we can
            # avoid some test flakes. How does this work? The "key" is the same between both
            # calls to the docker_compose_runner and the first one sets cleanup=False.
    
>           wait_for_port(docker_services, "test_broker", 29092, timeout=120)

.../integration/kafka-connect/test_kafka_connect.py:75: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../datahub/testing/docker_utils.py:36: in wait_for_port
    docker_services.wait_until_responsive(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Services(_docker_compose=DockerComposeExecutor(_compose_command='docker compose', _compose_files=['.../runner/work/d...tegration/kafka-connect/docker-compose.override.yml'], _compose_project_name='pytest4817-kafka-connect'), _services={})
check = <function wait_for_port.<locals>.<lambda> at 0x7f1ee7f7b540>
timeout = 120, pause = 0.5, clock = <built-in function perf_counter>

    def wait_until_responsive(
        self,
        check: Any,
        timeout: float,
        pause: float,
        clock: Any = timeit.default_timer,
    ) -> None:
        """Wait until a service is responsive."""
    
        ref = clock()
        now = ref
        while (now - ref) < timeout:
            if check():
                return
            time.sleep(pause)
            now = clock()
    
>       raise Exception("Timeout reached while waiting on service!")
E       Exception: Timeout reached while waiting on service!

venv/lib/python3.11.../site-packages/pytest_docker/plugin.py:121: Exception
tests.integration.kafka-connect.test_kafka_connect::test_kafka_connect_ingest_stateful
Stack Traces | 0.001s run time
docker_compose_runner = <function docker_compose_runner.<locals>.run at 0x7f1ee7f7b8b0>
pytestconfig = <_pytest.config.Config object at 0x7f1f38a8b080>
test_resources_dir = PosixPath('.../tests/integration/kafka-connect')

    @pytest.fixture(scope="module")
    def kafka_connect_runner(docker_compose_runner, pytestconfig, test_resources_dir):
        test_resources_dir_kafka = pytestconfig.rootpath / "tests/integration/kafka"
    
        # Share Compose configurations between files and projects
        # https://docs.docker.com/compose/extends/
        docker_compose_file = [
            str(test_resources_dir_kafka / "docker-compose.yml"),
            str(test_resources_dir / "docker-compose.override.yml"),
        ]
        with docker_compose_runner(
            docker_compose_file, "kafka-connect", cleanup=False
        ) as docker_services:
            wait_for_port(
                docker_services,
                "test_mysql",
                3306,
                timeout=120,
                checker=lambda: is_mysql_up("test_mysql", 3306),
            )
    
        with docker_compose_runner(docker_compose_file, "kafka-connect") as docker_services:
            # We sometimes run into issues where the broker fails to come up on the first try because
            # of all the other processes that are running. By running docker compose twice, we can
            # avoid some test flakes. How does this work? The "key" is the same between both
            # calls to the docker_compose_runner and the first one sets cleanup=False.
    
>           wait_for_port(docker_services, "test_broker", 29092, timeout=120)

.../integration/kafka-connect/test_kafka_connect.py:75: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../datahub/testing/docker_utils.py:36: in wait_for_port
    docker_services.wait_until_responsive(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Services(_docker_compose=DockerComposeExecutor(_compose_command='docker compose', _compose_files=['.../runner/work/d...tegration/kafka-connect/docker-compose.override.yml'], _compose_project_name='pytest4817-kafka-connect'), _services={})
check = <function wait_for_port.<locals>.<lambda> at 0x7f1ee7f7b540>
timeout = 120, pause = 0.5, clock = <built-in function perf_counter>

    def wait_until_responsive(
        self,
        check: Any,
        timeout: float,
        pause: float,
        clock: Any = timeit.default_timer,
    ) -> None:
        """Wait until a service is responsive."""
    
        ref = clock()
        now = ref
        while (now - ref) < timeout:
            if check():
                return
            time.sleep(pause)
            now = clock()
    
>       raise Exception("Timeout reached while waiting on service!")
E       Exception: Timeout reached while waiting on service!

venv/lib/python3.11.../site-packages/pytest_docker/plugin.py:121: Exception
tests.integration.kafka-connect.test_kafka_connect::test_kafka_connect_mongosourceconnect_ingest
Stack Traces | 0.001s run time
docker_compose_runner = <function docker_compose_runner.<locals>.run at 0x7f1ee7f7b8b0>
pytestconfig = <_pytest.config.Config object at 0x7f1f38a8b080>
test_resources_dir = PosixPath('.../tests/integration/kafka-connect')

    @pytest.fixture(scope="module")
    def kafka_connect_runner(docker_compose_runner, pytestconfig, test_resources_dir):
        test_resources_dir_kafka = pytestconfig.rootpath / "tests/integration/kafka"
    
        # Share Compose configurations between files and projects
        # https://docs.docker.com/compose/extends/
        docker_compose_file = [
            str(test_resources_dir_kafka / "docker-compose.yml"),
            str(test_resources_dir / "docker-compose.override.yml"),
        ]
        with docker_compose_runner(
            docker_compose_file, "kafka-connect", cleanup=False
        ) as docker_services:
            wait_for_port(
                docker_services,
                "test_mysql",
                3306,
                timeout=120,
                checker=lambda: is_mysql_up("test_mysql", 3306),
            )
    
        with docker_compose_runner(docker_compose_file, "kafka-connect") as docker_services:
            # We sometimes run into issues where the broker fails to come up on the first try because
            # of all the other processes that are running. By running docker compose twice, we can
            # avoid some test flakes. How does this work? The "key" is the same between both
            # calls to the docker_compose_runner and the first one sets cleanup=False.
    
>           wait_for_port(docker_services, "test_broker", 29092, timeout=120)

.../integration/kafka-connect/test_kafka_connect.py:75: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../datahub/testing/docker_utils.py:36: in wait_for_port
    docker_services.wait_until_responsive(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Services(_docker_compose=DockerComposeExecutor(_compose_command='docker compose', _compose_files=['.../runner/work/d...tegration/kafka-connect/docker-compose.override.yml'], _compose_project_name='pytest4817-kafka-connect'), _services={})
check = <function wait_for_port.<locals>.<lambda> at 0x7f1ee7f7b540>
timeout = 120, pause = 0.5, clock = <built-in function perf_counter>

    def wait_until_responsive(
        self,
        check: Any,
        timeout: float,
        pause: float,
        clock: Any = timeit.default_timer,
    ) -> None:
        """Wait until a service is responsive."""
    
        ref = clock()
        now = ref
        while (now - ref) < timeout:
            if check():
                return
            time.sleep(pause)
            now = clock()
    
>       raise Exception("Timeout reached while waiting on service!")
E       Exception: Timeout reached while waiting on service!

venv/lib/python3.11.../site-packages/pytest_docker/plugin.py:121: Exception

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

Copy link
Contributor

@sgomezvillamor sgomezvillamor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

We may add some test covering those scenarios in metadata-ingestion/tests/unit/test_file_lineage_source.py

@datahub-cyborg datahub-cyborg bot added pending-submitter-merge and removed needs-review Label for PRs that need review from a maintainer. labels Feb 28, 2025
@hsheth2
Copy link
Collaborator Author

hsheth2 commented Mar 2, 2025

@sgomezvillamor this is actually because of a change in behavior between pydantic v1 and v2. Because all of our tests run with pydantic v1, we can't really test this effectively yet

@hsheth2 hsheth2 merged commit a19edde into master Mar 2, 2025
218 of 247 checks passed
@hsheth2 hsheth2 deleted the fix-lineage-fix branch March 2, 2025 01:54
shirshanka pushed a commit to shirshanka/datahub that referenced this pull request Mar 3, 2025
PeteMango pushed a commit to PeteMango/datahub that referenced this pull request Mar 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ingestion PR or Issue related to the ingestion of metadata pending-submitter-merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Pydantic V2 fails to parse ingestion source
2 participants