Skip to content

Commit 29eece9

Browse files
FIX #93 - Enforce inference pipeline output unpacking
1 parent 2b26d29 commit 29eece9

File tree

7 files changed

+133
-37
lines changed

7 files changed

+133
-37
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
- `MlflowNodeHook` now has a `before_pipeline_run` hook which stores the `ProjectContext` and enable to retrieve configuration (#66).
2121
- Documentation reference to the plugin is now dynamic when necessary (#6).
22+
- The `KedroPipelineModel` now unpacks the result of the inference pipeline and no longer returns a dictionary with the name in the `DataCatalog` but only the associated value to the predictions.
2223

2324
### Removed
2425

docs/source/05_python_objects/03_Pipelines.md

+6-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,12 @@ def create_pipelines(**kwargs) -> Dict[str, Pipeline]:
2424
}
2525

2626
```
27-
Now each time you will run ``kedro run --pipeline=training`` (provided you registered ``MlflowPipelineHook`` in you ``run.py``), the full inference pipeline will be registered as a mlflow model (with all the outputs produced by training as artifacts : the machine learning, but also the *scaler*, *vectorizer*, *imputer*, or whatever object fitted on data you create in ``training`` and that is used in ``inference``).
27+
Now each time you will run ``kedro run --pipeline=training`` (provided you registered ``MlflowPipelineHook`` in you ``run.py``), the full inference pipeline will be registered as a mlflow model (with all the outputs produced by training as artifacts : the machine learning model, but also the *scaler*, *vectorizer*, *imputer*, or whatever object fitted on data you create in ``training`` and that is used in ``inference``).
28+
29+
Note that:
30+
- the `inference` pipeline `inputs` must belong either to training `outputs` (vectorizer, binarizer, machine learning model...)
31+
- the `inference` pipeline `inputs` must be persisted locally on disk (i.e. it must not be `MemoryDataSet` and must have a local `filepath`)
32+
- the `inference` pipeline must have one and only one `output`
2833

2934
*Note: If you want to log a ``PipelineML`` object in ``mlflow`` programatically, you can use the following code snippet:*
3035

kedro_mlflow/mlflow/kedro_pipeline_model.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ def __init__(self, pipeline_ml: PipelineML, catalog: DataCatalog):
1414
self.pipeline_ml = pipeline_ml
1515
self.initial_catalog = pipeline_ml.extract_pipeline_catalog(catalog)
1616
self.loaded_catalog = DataCatalog()
17+
# we have the guarantee that there is only one output in inference
18+
self.output_name = list(pipeline_ml.inference.outputs())[0]
1719

1820
def load_context(self, context):
1921

@@ -33,7 +35,11 @@ def load_context(self, context):
3335
kedro_artifacts_keys - mlflow_artifacts_keys
3436
)
3537
raise ValueError(
36-
f"Provided artifacts do not match catalog entries:\n- 'artifacts - inference.inputs()' = : {in_artifacts_but_not_inference}'\n- 'inference.inputs() - artifacts' = : {in_inference_but_not_artifacts}'"
38+
(
39+
"Provided artifacts do not match catalog entries:"
40+
f"\n - 'artifacts - inference.inputs()' = : {in_artifacts_but_not_inference}"
41+
f"\n - 'inference.inputs() - artifacts' = : {in_inference_but_not_artifacts}"
42+
)
3743
)
3844

3945
self.loaded_catalog = deepcopy(self.initial_catalog)
@@ -53,4 +59,6 @@ def predict(self, context, model_input):
5359
run_outputs = runner.run(
5460
pipeline=self.pipeline_ml.inference, catalog=self.loaded_catalog
5561
)
56-
return run_outputs
62+
return run_outputs[
63+
self.output_name
64+
] # unpack the result to avoid messing the json output

kedro_mlflow/pipeline/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from .pipeline_ml import (
22
KedroMlflowPipelineMLDatasetsError,
33
KedroMlflowPipelineMLInputsError,
4+
KedroMlflowPipelineMLOutputsError,
45
)
56
from .pipeline_ml_factory import pipeline_ml, pipeline_ml_factory

kedro_mlflow/pipeline/pipeline_ml.py

+65-33
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@
55
from kedro.pipeline import Pipeline
66
from kedro.pipeline.node import Node
77

8-
MSG_NOT_IMPLEMENTED = "This method is not implemented because it does not make sens for 'PipelineML'. Manipulate directly the training pipeline and recreate the 'PipelineML' with 'pipeline_ml_factory' factory"
8+
MSG_NOT_IMPLEMENTED = (
9+
"This method is not implemented because it does"
10+
"not make sense for 'PipelineML'."
11+
"Manipulate directly the training pipeline and"
12+
"recreate the 'PipelineML' with 'pipeline_ml_factory' factory."
13+
)
914

1015

1116
class PipelineML(Pipeline):
@@ -78,7 +83,6 @@ def __init__(
7883
self.inference = inference
7984
self.conda_env = conda_env
8085
self.model_name = model_name
81-
8286
self.input_name = input_name
8387

8488
@property
@@ -90,6 +94,58 @@ def input_name(self, name: str) -> None:
9094
self._check_input_name(name)
9195
self._input_name = name
9296

97+
@property
98+
def inference(self) -> str:
99+
return self._inference
100+
101+
@inference.setter
102+
def inference(self, inference: Pipeline) -> None:
103+
self._check_inference(inference)
104+
self._inference = inference
105+
106+
@property
107+
def training(self) -> Pipeline:
108+
return Pipeline(self.nodes)
109+
110+
def _check_input_name(self, input_name: str) -> str:
111+
allowed_names = self.inference.inputs()
112+
pp_allowed_names = "\n - ".join(allowed_names)
113+
if input_name not in allowed_names:
114+
raise KedroMlflowPipelineMLInputsError(
115+
f"input_name='{input_name}' but it must be an input of 'inference', i.e. one of: \n - {pp_allowed_names}"
116+
)
117+
else:
118+
free_inputs_set = (
119+
self.inference.inputs() - {input_name} - self.all_outputs()
120+
)
121+
if len(free_inputs_set) > 0:
122+
raise KedroMlflowPipelineMLInputsError(
123+
"""
124+
The following inputs are free for the inference pipeline:
125+
- {inputs}.
126+
No free input is allowed.
127+
Please make sure that 'inference.pipeline.inputs()' are all in 'training.pipeline.all_outputs()',
128+
except eventually 'input_name'.""".format(
129+
inputs="\n - ".join(free_inputs_set)
130+
)
131+
)
132+
133+
return None
134+
135+
def _check_inference(self, inference: Pipeline) -> None:
136+
nb_outputs = len(inference.outputs())
137+
outputs_txt = "\n - ".join(inference.outputs())
138+
if len(inference.outputs()) != 1:
139+
raise KedroMlflowPipelineMLOutputsError(
140+
(
141+
"The inference pipeline must have one"
142+
" and only one output. You are trying"
143+
" to set a inference pipeline with"
144+
f" '{nb_outputs}' output(s): \n - {outputs_txt}"
145+
" "
146+
)
147+
)
148+
93149
def extract_pipeline_catalog(self, catalog: DataCatalog) -> DataCatalog:
94150
sub_catalog = DataCatalog()
95151
for data_set_name in self.inference.inputs():
@@ -136,36 +192,7 @@ def extract_pipeline_artifacts(self, catalog: DataCatalog):
136192
}
137193
return artifacts
138194

139-
@property
140-
def training(self):
141-
return Pipeline(self.nodes)
142-
143-
def _check_input_name(self, input_name: str) -> str:
144-
allowed_names = self.inference.inputs()
145-
pp_allowed_names = "\n - ".join(allowed_names)
146-
if input_name not in allowed_names:
147-
raise KedroMlflowPipelineMLInputsError(
148-
f"input_name='{input_name}' but it must be an input of 'inference', i.e. one of: \n - {pp_allowed_names}"
149-
)
150-
else:
151-
free_inputs_set = (
152-
self.inference.inputs() - {input_name} - self.all_outputs()
153-
)
154-
if len(free_inputs_set) > 0:
155-
raise KedroMlflowPipelineMLInputsError(
156-
"""
157-
The following inputs are free for the inference pipeline:
158-
- {inputs}.
159-
No free input is allowed.
160-
Please make sure that 'inference.pipeline.inputs()' are all in 'training.pipeline.all_outputs()',
161-
except eventually 'input_name'.""".format(
162-
inputs="\n - ".join(free_inputs_set)
163-
)
164-
)
165-
166-
return None
167-
168-
def _turn_pipeline_to_ml(self, pipeline):
195+
def _turn_pipeline_to_ml(self, pipeline: Pipeline):
169196
return PipelineML(
170197
nodes=pipeline.nodes, inference=self.inference, input_name=self.input_name
171198
)
@@ -230,10 +257,15 @@ def __or__(self, other): # pragma: no cover
230257

231258

232259
class KedroMlflowPipelineMLInputsError(Exception):
233-
"""Error raised when the inputs of KedroPipelineMoel are invalid
260+
"""Error raised when the inputs of KedroPipelineModel are invalid
234261
"""
235262

236263

237264
class KedroMlflowPipelineMLDatasetsError(Exception):
238265
"""Error raised when the inputs of KedroPipelineMoel are invalid
239266
"""
267+
268+
269+
class KedroMlflowPipelineMLOutputsError(Exception):
270+
"""Error raised when the outputs of KedroPipelineModel are invalid
271+
"""

tests/mlflow/test_kedro_pipeline_model.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def test_model_packaging(tmp_path, pipeline_ml_obj):
8383
loaded_model = mlflow.pyfunc.load_model(
8484
model_uri=(Path(r"runs:/") / run_id / "model").as_posix()
8585
)
86-
assert loaded_model.predict(1) == {"predictions": 2}
86+
assert loaded_model.predict(1) == 2
8787

8888

8989
# should very likely add tests to see what happens when the artifacts

tests/pipeline/test_pipeline_ml.py

+49
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from kedro_mlflow.pipeline import (
99
KedroMlflowPipelineMLDatasetsError,
1010
KedroMlflowPipelineMLInputsError,
11+
KedroMlflowPipelineMLOutputsError,
1112
pipeline_ml,
1213
pipeline_ml_factory,
1314
)
@@ -34,6 +35,14 @@ def predict_fun(model, data):
3435
return data * model
3536

3637

38+
def predict_fun_with_metric(model, data):
39+
return data * model, "super_metric"
40+
41+
42+
def predict_fun_return_nothing(model, data):
43+
pass
44+
45+
3746
@pytest.fixture
3847
def pipeline_with_tag():
3948

@@ -360,3 +369,43 @@ def test_invalid_input_name(pipeline_ml_with_tag):
360369
match="input_name='whoops_bad_name' but it must be an input of 'inference'",
361370
):
362371
pipeline_ml_with_tag.input_name = "whoops_bad_name"
372+
373+
374+
def test_too_many_inference_outputs():
375+
with pytest.raises(
376+
KedroMlflowPipelineMLOutputsError,
377+
match="The inference pipeline must have one and only one output",
378+
):
379+
pipeline_ml_factory(
380+
training=Pipeline([node(func=train_fun, inputs="data", outputs="model",)]),
381+
inference=Pipeline(
382+
[
383+
node(
384+
func=predict_fun_with_metric,
385+
inputs=["model", "data"],
386+
outputs=["predictions", "metric"],
387+
)
388+
]
389+
),
390+
input_name="data",
391+
)
392+
393+
394+
def test_not_enough_inference_outputs():
395+
with pytest.raises(
396+
KedroMlflowPipelineMLOutputsError,
397+
match="The inference pipeline must have one and only one output",
398+
):
399+
pipeline_ml_factory(
400+
training=Pipeline([node(func=train_fun, inputs="data", outputs="model",)]),
401+
inference=Pipeline(
402+
[
403+
node(
404+
func=predict_fun_return_nothing,
405+
inputs=["model", "data"],
406+
outputs=None,
407+
)
408+
]
409+
),
410+
input_name="data",
411+
)

0 commit comments

Comments
 (0)