Skip to content

Commit 516cb4b

Browse files
✨ Implement missing PipelineML slicing functionalities (#377) (#601)
* implement missing pipeline ml slicing functionalities * pass tests --------- Co-authored-by: Yolan Honoré-Rougé <yolan.honore.rouge@gmail.com>
1 parent 7aadd6c commit 516cb4b

File tree

2 files changed

+144
-35
lines changed

2 files changed

+144
-35
lines changed

kedro_mlflow/pipeline/pipeline_ml.py

+24-28
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,10 @@
44
from kedro.pipeline import Pipeline
55
from kedro.pipeline.node import Node
66

7-
MSG_NOT_IMPLEMENTED = (
8-
"This method is not implemented because it does "
9-
"not make sense for 'PipelineML'. "
10-
"Manipulate directly the training pipeline and "
11-
"recreate the 'PipelineML' with 'pipeline_ml_factory' factory."
12-
)
13-
14-
MSG_WARNING_KEDRO_VIZ = (
7+
MSG_WARNING_PIPELINEML_DEMOTED = (
158
"BEWARE - This 'Pipeline' is no longer a 'PipelineML' object. "
16-
"This method is only implemented for compatibility with kedro-viz "
17-
"but should never be used directly.\nSee "
9+
"This method is only implemented for compatibility with kedro-viz and pipeline resume hints on failure."
10+
"It should never be used directly.\nSee "
1811
"https://github.com/Galileo-Galilei/kedro-mlflow/issues/569 "
1912
" for more context. "
2013
)
@@ -173,16 +166,18 @@ def _turn_pipeline_to_ml(self, pipeline: Pipeline):
173166
)
174167

175168
def only_nodes(self, *node_names: str) -> "Pipeline": # pragma: no cover
176-
raise NotImplementedError(MSG_NOT_IMPLEMENTED)
169+
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
170+
return self.training.only_nodes(*node_names)
177171

178172
def only_nodes_with_namespace(
179173
self, node_namespace: str
180174
) -> "Pipeline": # pragma: no cover
181-
self._logger.warning(MSG_WARNING_KEDRO_VIZ)
175+
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
182176
return self.training.only_nodes_with_namespace(node_namespace)
183177

184-
def only_nodes_with_inputs(self, *inputs: str) -> "PipelineML": # pragma: no cover
185-
raise NotImplementedError(MSG_NOT_IMPLEMENTED)
178+
def only_nodes_with_inputs(self, *inputs: str) -> "Pipeline": # pragma: no cover
179+
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
180+
return self.training.only_nodes_with_inputs(*inputs)
186181

187182
def from_inputs(self, *inputs: str) -> "PipelineML": # pragma: no cover
188183
# exceptionnally, we don't call super() because it raises
@@ -194,10 +189,9 @@ def from_inputs(self, *inputs: str) -> "PipelineML": # pragma: no cover
194189
pipeline = self.training.from_inputs(*inputs)
195190
return self._turn_pipeline_to_ml(pipeline)
196191

197-
def only_nodes_with_outputs(
198-
self, *outputs: str
199-
) -> "PipelineML": # pragma: no cover
200-
raise NotImplementedError(MSG_NOT_IMPLEMENTED)
192+
def only_nodes_with_outputs(self, *outputs: str) -> "Pipeline": # pragma: no cover
193+
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
194+
return self.training.only_nodes_with_outputs(*outputs)
201195

202196
def to_outputs(self, *outputs: str) -> "PipelineML": # pragma: no cover
203197
# see from_inputs for an explanation of why we don't call super()
@@ -225,13 +219,13 @@ def tag(self, tags: Union[str, Iterable[str]]) -> "PipelineML":
225219

226220
def filter(
227221
self,
228-
tags: Iterable[str] = None,
229-
from_nodes: Iterable[str] = None,
230-
to_nodes: Iterable[str] = None,
231-
node_names: Iterable[str] = None,
232-
from_inputs: Iterable[str] = None,
233-
to_outputs: Iterable[str] = None,
234-
node_namespace: str = None,
222+
tags: Optional[Iterable[str]] = None,
223+
from_nodes: Optional[Iterable[str]] = None,
224+
to_nodes: Optional[Iterable[str]] = None,
225+
node_names: Optional[Iterable[str]] = None,
226+
from_inputs: Optional[Iterable[str]] = None,
227+
to_outputs: Optional[Iterable[str]] = None,
228+
node_namespace: Optional[str] = None,
235229
) -> "Pipeline":
236230
# see from_inputs for an explanation of why we don't call super()
237231
pipeline = self.training.filter(
@@ -246,10 +240,11 @@ def filter(
246240
return self._turn_pipeline_to_ml(pipeline)
247241

248242
def __add__(self, other): # pragma: no cover
249-
raise NotImplementedError(MSG_NOT_IMPLEMENTED)
243+
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
244+
return self.training + other
250245

251246
def __sub__(self, other): # pragma: no cover
252-
self._logger.warning(MSG_WARNING_KEDRO_VIZ)
247+
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
253248
return self.training - other
254249

255250
def __and__(self, other): # pragma: no cover
@@ -258,7 +253,8 @@ def __and__(self, other): # pragma: no cover
258253
return self._turn_pipeline_to_ml(new_pipeline)
259254

260255
def __or__(self, other): # pragma: no cover
261-
raise NotImplementedError(MSG_NOT_IMPLEMENTED)
256+
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
257+
return self.training | other
262258

263259

264260
class KedroMlflowPipelineMLError(Exception):

tests/pipeline/test_pipeline_ml.py

+120-7
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ def pipeline_ml_with_intermediary_artifacts():
9090
inputs="data",
9191
outputs="encoder",
9292
tags=["training"],
93+
name="node_fit_encoder_fun_data",
9394
),
9495
node(
9596
func=apply_encoder_fun,
@@ -258,11 +259,69 @@ def catalog_with_parameters():
258259
return catalog_with_parameters
259260

260261

262+
def test_pipeline_ml_only_nodes(
263+
caplog,
264+
pipeline_ml_with_intermediary_artifacts,
265+
):
266+
"""When the pipeline is filtered with only_nodes, we return only the training pipeline. This is for kedro-viz and resume hints compatibility"""
267+
268+
# pipeline_ml_with_namespace are fixture in conftest
269+
270+
# remember : the arguments are iterable, so do not pass string directly (e.g ["training"] rather than training)
271+
272+
filtered_pipeline_ml = pipeline_ml_with_intermediary_artifacts.only_nodes(
273+
"node_fit_encoder_fun_data"
274+
)
275+
276+
# PipelineML class must be preserved when filtering
277+
# inference should be unmodified
278+
# training pipeline nodes must be identical to kedro filtering.
279+
assert isinstance(filtered_pipeline_ml, Pipeline)
280+
assert not isinstance(filtered_pipeline_ml, PipelineML)
281+
assert str(filtered_pipeline_ml) == str(
282+
pipeline_ml_with_intermediary_artifacts.training.only_nodes(
283+
"node_fit_encoder_fun_data"
284+
)
285+
)
286+
assert (
287+
"for compatibility with kedro-viz and pipeline resume hints on failure"
288+
in caplog.text
289+
)
290+
291+
292+
def test_pipeline_ml_only_nodes_with_outputs(
293+
caplog,
294+
pipeline_ml_with_intermediary_artifacts,
295+
):
296+
"""When the pipeline is filtered with only_nodes, we return only the training pipeline. This is for kedro-viz and resume hints compatibility"""
297+
298+
# pipeline_ml_with_intermediary_artifacts are fixture in conftest
299+
300+
# remember : the arguments are iterable, so do not pass string directly (e.g ["training"] rather than training)
301+
302+
filtered_pipeline_ml = (
303+
pipeline_ml_with_intermediary_artifacts.only_nodes_with_outputs("data")
304+
)
305+
306+
# PipelineML class must be preserved when filtering
307+
# inference should be unmodified
308+
# training pipeline nodes must be identical to kedro filtering.
309+
assert isinstance(filtered_pipeline_ml, Pipeline)
310+
assert not isinstance(filtered_pipeline_ml, PipelineML)
311+
assert str(filtered_pipeline_ml) == str(
312+
pipeline_ml_with_intermediary_artifacts.training.only_nodes_with_outputs("data")
313+
)
314+
assert (
315+
"for compatibility with kedro-viz and pipeline resume hints on failure"
316+
in caplog.text
317+
)
318+
319+
261320
def test_pipeline_ml_only_nodes_with_namespace(
262321
caplog,
263322
pipeline_ml_with_namespace,
264323
):
265-
"""When the pipeline is filtered with only_nodes_with_namespace, we return only the training pipeline. This is for kedro viz compatibility"""
324+
"""When the pipeline is filtered with only_nodes, we return only the training pipeline. This is for kedro-viz and resume hints compatibility"""
266325

267326
# pipeline_ml_with_namespace are fixture in conftest
268327

@@ -278,7 +337,10 @@ def test_pipeline_ml_only_nodes_with_namespace(
278337
assert isinstance(filtered_pipeline_ml, Pipeline)
279338
assert not isinstance(filtered_pipeline_ml, PipelineML)
280339
assert str(filtered_pipeline_ml) == str(pipeline_ml_with_namespace.training)
281-
assert "kedro-viz but should never be" in caplog.text
340+
assert (
341+
"for compatibility with kedro-viz and pipeline resume hints on failure"
342+
in caplog.text
343+
)
282344

283345

284346
def test_pipeline_ml_substraction(
@@ -301,7 +363,58 @@ def test_pipeline_ml_substraction(
301363
# training pipeline nodes must be identical to kedro filtering.
302364
assert isinstance(filtered_pipeline_ml, Pipeline)
303365
assert not isinstance(filtered_pipeline_ml, PipelineML)
304-
assert "kedro-viz but should never be" in caplog.text
366+
assert (
367+
"for compatibility with kedro-viz and pipeline resume hints on failure"
368+
in caplog.text
369+
)
370+
371+
372+
def test_pipeline_ml_addition(
373+
caplog,
374+
pipeline_ml_with_namespace,
375+
pipeline_ml_with_tag,
376+
):
377+
"""When the pipeline is filtered with only_nodes_with_namespace, we return only the training pipeline. This is for kedro viz compatibility"""
378+
379+
# pipeline_ml_with_namespace are fixture in conftest
380+
381+
# remember : the arguments are iterable, so do not pass string directly (e.g ["training"] rather than training)
382+
383+
sum_of_pipeline_ml = pipeline_ml_with_namespace + pipeline_ml_with_tag
384+
385+
# PipelineML class must be preserved when filtering
386+
# inference should be unmodified
387+
# training pipeline nodes must be identical to kedro filtering.
388+
assert isinstance(sum_of_pipeline_ml, Pipeline)
389+
assert not isinstance(sum_of_pipeline_ml, PipelineML)
390+
assert (
391+
"for compatibility with kedro-viz and pipeline resume hints on failure"
392+
in caplog.text
393+
)
394+
395+
396+
def test_pipeline_ml_or(
397+
caplog,
398+
pipeline_ml_with_namespace,
399+
pipeline_ml_with_tag,
400+
):
401+
"""When the pipeline is filtered with only_nodes_with_namespace, we return only the training pipeline. This is for kedro viz compatibility"""
402+
403+
# pipeline_ml_with_namespace are fixture in conftest
404+
405+
# remember : the arguments are iterable, so do not pass string directly (e.g ["training"] rather than training)
406+
407+
or_of_pipeline_ml = pipeline_ml_with_namespace | pipeline_ml_with_tag
408+
409+
# PipelineML class must be preserved when filtering
410+
# inference should be unmodified
411+
# training pipeline nodes must be identical to kedro filtering.
412+
assert isinstance(or_of_pipeline_ml, Pipeline)
413+
assert not isinstance(or_of_pipeline_ml, PipelineML)
414+
assert (
415+
"for compatibility with kedro-viz and pipeline resume hints on failure"
416+
in caplog.text
417+
)
305418

306419

307420
@pytest.mark.parametrize(
@@ -316,7 +429,7 @@ def test_pipeline_ml_substraction(
316429
(None, None, None, None, ["data"]),
317430
],
318431
)
319-
def test_filtering_pipeline_ml(
432+
def test_pipeline_ml_filtering(
320433
mocker,
321434
pipeline_with_tag,
322435
pipeline_ml_with_tag,
@@ -374,7 +487,7 @@ def test_filtering_pipeline_ml(
374487
(None, None, None, ["preprocess_fun([raw_data]) -> [data]"], None),
375488
],
376489
)
377-
def test_filtering_generate_invalid_pipeline_ml(
490+
def test_pipeline_ml__filtering_generate_invalid_pipeline_ml(
378491
mocker,
379492
pipeline_ml_obj,
380493
tags,
@@ -405,7 +518,7 @@ def test_filtering_generate_invalid_pipeline_ml(
405518
# pass
406519

407520

408-
def test_too_many_free_inputs():
521+
def test_pipeline_ml_too_many_free_inputs():
409522
with pytest.raises(KedroMlflowPipelineMLError, match="No free input is allowed"):
410523
pipeline_ml_factory(
411524
training=Pipeline(
@@ -430,7 +543,7 @@ def test_too_many_free_inputs():
430543
)
431544

432545

433-
def test_tagging(pipeline_ml_with_tag):
546+
def test_pipeline_ml_tagging(pipeline_ml_with_tag):
434547
new_pl = pipeline_ml_with_tag.tag(["hello"])
435548
assert all(["hello" in node.tags for node in new_pl.nodes])
436549

0 commit comments

Comments
 (0)