Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement missing PipelineML slicing functionalities #601

Merged
merged 2 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 24 additions & 28 deletions kedro_mlflow/pipeline/pipeline_ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,10 @@
from kedro.pipeline import Pipeline
from kedro.pipeline.node import Node

MSG_NOT_IMPLEMENTED = (
"This method is not implemented because it does "
"not make sense for 'PipelineML'. "
"Manipulate directly the training pipeline and "
"recreate the 'PipelineML' with 'pipeline_ml_factory' factory."
)

MSG_WARNING_KEDRO_VIZ = (
MSG_WARNING_PIPELINEML_DEMOTED = (
"BEWARE - This 'Pipeline' is no longer a 'PipelineML' object. "
"This method is only implemented for compatibility with kedro-viz "
"but should never be used directly.\nSee "
"This method is only implemented for compatibility with kedro-viz and pipeline resume hints on failure."
"It should never be used directly.\nSee "
"https://github.com/Galileo-Galilei/kedro-mlflow/issues/569 "
" for more context. "
)
Expand Down Expand Up @@ -173,16 +166,18 @@ def _turn_pipeline_to_ml(self, pipeline: Pipeline):
)

def only_nodes(self, *node_names: str) -> "Pipeline": # pragma: no cover
raise NotImplementedError(MSG_NOT_IMPLEMENTED)
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
return self.training.only_nodes(*node_names)

def only_nodes_with_namespace(
self, node_namespace: str
) -> "Pipeline": # pragma: no cover
self._logger.warning(MSG_WARNING_KEDRO_VIZ)
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
return self.training.only_nodes_with_namespace(node_namespace)

def only_nodes_with_inputs(self, *inputs: str) -> "PipelineML": # pragma: no cover
raise NotImplementedError(MSG_NOT_IMPLEMENTED)
def only_nodes_with_inputs(self, *inputs: str) -> "Pipeline": # pragma: no cover
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
return self.training.only_nodes_with_inputs(*inputs)

def from_inputs(self, *inputs: str) -> "PipelineML": # pragma: no cover
# exceptionnally, we don't call super() because it raises
Expand All @@ -194,10 +189,9 @@ def from_inputs(self, *inputs: str) -> "PipelineML": # pragma: no cover
pipeline = self.training.from_inputs(*inputs)
return self._turn_pipeline_to_ml(pipeline)

def only_nodes_with_outputs(
self, *outputs: str
) -> "PipelineML": # pragma: no cover
raise NotImplementedError(MSG_NOT_IMPLEMENTED)
def only_nodes_with_outputs(self, *outputs: str) -> "Pipeline": # pragma: no cover
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
return self.training.only_nodes_with_outputs(*outputs)

def to_outputs(self, *outputs: str) -> "PipelineML": # pragma: no cover
# see from_inputs for an explanation of why we don't call super()
Expand Down Expand Up @@ -225,13 +219,13 @@ def tag(self, tags: Union[str, Iterable[str]]) -> "PipelineML":

def filter(
self,
tags: Iterable[str] = None,
from_nodes: Iterable[str] = None,
to_nodes: Iterable[str] = None,
node_names: Iterable[str] = None,
from_inputs: Iterable[str] = None,
to_outputs: Iterable[str] = None,
node_namespace: str = None,
tags: Optional[Iterable[str]] = None,
from_nodes: Optional[Iterable[str]] = None,
to_nodes: Optional[Iterable[str]] = None,
node_names: Optional[Iterable[str]] = None,
from_inputs: Optional[Iterable[str]] = None,
to_outputs: Optional[Iterable[str]] = None,
node_namespace: Optional[str] = None,
) -> "Pipeline":
# see from_inputs for an explanation of why we don't call super()
pipeline = self.training.filter(
Expand All @@ -246,10 +240,11 @@ def filter(
return self._turn_pipeline_to_ml(pipeline)

def __add__(self, other): # pragma: no cover
raise NotImplementedError(MSG_NOT_IMPLEMENTED)
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
return self.training + other

def __sub__(self, other): # pragma: no cover
self._logger.warning(MSG_WARNING_KEDRO_VIZ)
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
return self.training - other

def __and__(self, other): # pragma: no cover
Expand All @@ -258,7 +253,8 @@ def __and__(self, other): # pragma: no cover
return self._turn_pipeline_to_ml(new_pipeline)

def __or__(self, other): # pragma: no cover
raise NotImplementedError(MSG_NOT_IMPLEMENTED)
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
return self.training | other


class KedroMlflowPipelineMLError(Exception):
Expand Down
127 changes: 120 additions & 7 deletions tests/pipeline/test_pipeline_ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def pipeline_ml_with_intermediary_artifacts():
inputs="data",
outputs="encoder",
tags=["training"],
name="node_fit_encoder_fun_data",
),
node(
func=apply_encoder_fun,
Expand Down Expand Up @@ -258,11 +259,69 @@ def catalog_with_parameters():
return catalog_with_parameters


def test_pipeline_ml_only_nodes(
caplog,
pipeline_ml_with_intermediary_artifacts,
):
"""When the pipeline is filtered with only_nodes, we return only the training pipeline. This is for kedro-viz and resume hints compatibility"""

# pipeline_ml_with_namespace are fixture in conftest

# remember : the arguments are iterable, so do not pass string directly (e.g ["training"] rather than training)

filtered_pipeline_ml = pipeline_ml_with_intermediary_artifacts.only_nodes(
"node_fit_encoder_fun_data"
)

# PipelineML class must be preserved when filtering
# inference should be unmodified
# training pipeline nodes must be identical to kedro filtering.
assert isinstance(filtered_pipeline_ml, Pipeline)
assert not isinstance(filtered_pipeline_ml, PipelineML)
assert str(filtered_pipeline_ml) == str(
pipeline_ml_with_intermediary_artifacts.training.only_nodes(
"node_fit_encoder_fun_data"
)
)
assert (
"for compatibility with kedro-viz and pipeline resume hints on failure"
in caplog.text
)


def test_pipeline_ml_only_nodes_with_outputs(
caplog,
pipeline_ml_with_intermediary_artifacts,
):
"""When the pipeline is filtered with only_nodes, we return only the training pipeline. This is for kedro-viz and resume hints compatibility"""

# pipeline_ml_with_intermediary_artifacts are fixture in conftest

# remember : the arguments are iterable, so do not pass string directly (e.g ["training"] rather than training)

filtered_pipeline_ml = (
pipeline_ml_with_intermediary_artifacts.only_nodes_with_outputs("data")
)

# PipelineML class must be preserved when filtering
# inference should be unmodified
# training pipeline nodes must be identical to kedro filtering.
assert isinstance(filtered_pipeline_ml, Pipeline)
assert not isinstance(filtered_pipeline_ml, PipelineML)
assert str(filtered_pipeline_ml) == str(
pipeline_ml_with_intermediary_artifacts.training.only_nodes_with_outputs("data")
)
assert (
"for compatibility with kedro-viz and pipeline resume hints on failure"
in caplog.text
)


def test_pipeline_ml_only_nodes_with_namespace(
caplog,
pipeline_ml_with_namespace,
):
"""When the pipeline is filtered with only_nodes_with_namespace, we return only the training pipeline. This is for kedro viz compatibility"""
"""When the pipeline is filtered with only_nodes, we return only the training pipeline. This is for kedro-viz and resume hints compatibility"""

# pipeline_ml_with_namespace are fixture in conftest

Expand All @@ -278,7 +337,10 @@ def test_pipeline_ml_only_nodes_with_namespace(
assert isinstance(filtered_pipeline_ml, Pipeline)
assert not isinstance(filtered_pipeline_ml, PipelineML)
assert str(filtered_pipeline_ml) == str(pipeline_ml_with_namespace.training)
assert "kedro-viz but should never be" in caplog.text
assert (
"for compatibility with kedro-viz and pipeline resume hints on failure"
in caplog.text
)


def test_pipeline_ml_substraction(
Expand All @@ -301,7 +363,58 @@ def test_pipeline_ml_substraction(
# training pipeline nodes must be identical to kedro filtering.
assert isinstance(filtered_pipeline_ml, Pipeline)
assert not isinstance(filtered_pipeline_ml, PipelineML)
assert "kedro-viz but should never be" in caplog.text
assert (
"for compatibility with kedro-viz and pipeline resume hints on failure"
in caplog.text
)


def test_pipeline_ml_addition(
caplog,
pipeline_ml_with_namespace,
pipeline_ml_with_tag,
):
"""When the pipeline is filtered with only_nodes_with_namespace, we return only the training pipeline. This is for kedro viz compatibility"""

# pipeline_ml_with_namespace are fixture in conftest

# remember : the arguments are iterable, so do not pass string directly (e.g ["training"] rather than training)

sum_of_pipeline_ml = pipeline_ml_with_namespace + pipeline_ml_with_tag

# PipelineML class must be preserved when filtering
# inference should be unmodified
# training pipeline nodes must be identical to kedro filtering.
assert isinstance(sum_of_pipeline_ml, Pipeline)
assert not isinstance(sum_of_pipeline_ml, PipelineML)
assert (
"for compatibility with kedro-viz and pipeline resume hints on failure"
in caplog.text
)


def test_pipeline_ml_or(
caplog,
pipeline_ml_with_namespace,
pipeline_ml_with_tag,
):
"""When the pipeline is filtered with only_nodes_with_namespace, we return only the training pipeline. This is for kedro viz compatibility"""

# pipeline_ml_with_namespace are fixture in conftest

# remember : the arguments are iterable, so do not pass string directly (e.g ["training"] rather than training)

or_of_pipeline_ml = pipeline_ml_with_namespace | pipeline_ml_with_tag

# PipelineML class must be preserved when filtering
# inference should be unmodified
# training pipeline nodes must be identical to kedro filtering.
assert isinstance(or_of_pipeline_ml, Pipeline)
assert not isinstance(or_of_pipeline_ml, PipelineML)
assert (
"for compatibility with kedro-viz and pipeline resume hints on failure"
in caplog.text
)


@pytest.mark.parametrize(
Expand All @@ -316,7 +429,7 @@ def test_pipeline_ml_substraction(
(None, None, None, None, ["data"]),
],
)
def test_filtering_pipeline_ml(
def test_pipeline_ml_filtering(
mocker,
pipeline_with_tag,
pipeline_ml_with_tag,
Expand Down Expand Up @@ -374,7 +487,7 @@ def test_filtering_pipeline_ml(
(None, None, None, ["preprocess_fun([raw_data]) -> [data]"], None),
],
)
def test_filtering_generate_invalid_pipeline_ml(
def test_pipeline_ml__filtering_generate_invalid_pipeline_ml(
mocker,
pipeline_ml_obj,
tags,
Expand Down Expand Up @@ -405,7 +518,7 @@ def test_filtering_generate_invalid_pipeline_ml(
# pass


def test_too_many_free_inputs():
def test_pipeline_ml_too_many_free_inputs():
with pytest.raises(KedroMlflowPipelineMLError, match="No free input is allowed"):
pipeline_ml_factory(
training=Pipeline(
Expand All @@ -430,7 +543,7 @@ def test_too_many_free_inputs():
)


def test_tagging(pipeline_ml_with_tag):
def test_pipeline_ml_tagging(pipeline_ml_with_tag):
new_pl = pipeline_ml_with_tag.tag(["hello"])
assert all(["hello" in node.tags for node in new_pl.nodes])

Expand Down
Loading