PipelineML
is a new class which extends Pipeline
and enable to bind two pipelines (one of training, one of inference) together. This class comes with a KedroPipelineModel
class for logging it in mlflow. A pipeline logged as a mlflow model can be served using mlflow models serve
and mlflow models predict
command.
The PipelineML
class is not intended to be used directly. A pipeline_ml_factory
factory is provided for user friendly interface.
Example within kedro template:
# in src/PYTHON_PACKAGE/pipeline.py
from PYTHON_PACKAGE.pipelines import data_science as ds
def create_pipelines(**kwargs) -> Dict[str, Pipeline]:
data_science_pipeline = ds.create_pipeline()
training_pipeline = pipeline_ml_factory(
training=data_science_pipeline.only_nodes_with_tags(
"training"
), # or whatever your logic is for filtering
inference=data_science_pipeline.only_nodes_with_tags("inference"),
)
return {
"ds": data_science_pipeline,
"training": training_pipeline,
"__default__": data_engineering_pipeline + data_science_pipeline,
}
Now each time you will run kedro run --pipeline=training
(provided you registered MlflowHook
in you run.py
), the full inference pipeline will be registered as a mlflow model (with all the outputs produced by training as artifacts : the machine learning model, but also the scaler, vectorizer, imputer, or whatever object fitted on data you create in training
and that is used in inference
).
Note that:
- the
inference
pipelineinput_name
can be aMemoryDataset
and it belongs to inference pipelineinputs
- Apart form
input_name
, all otherinference
pipelineinputs
must be persisted locally on disk (i.e. it must not beMemoryDataset
and must have a localfilepath
) - the
inference
pipelineinputs
must belong to trainingoutputs
(vectorizer, binarizer, machine learning model...) - the
inference
pipeline must have one and only oneoutput
``PipelineML`` objects do not implement all filtering methods of a regular ``Pipeline``, and you cannot add or substract 2 ``PipelineML`` together. The rationale is that a filtered ``PipelineML`` is not a ``PipelineML`` in general, because the [filtering is not consistent between training and inference](https://github.com/Galileo-Galilei/kedro-mlflow/issues/554). You can see the ones which are supported [in the code](https://github.com/Galileo-Galilei/kedro-mlflow/blob/master/kedro_mlflow/pipeline/pipeline_ml.py#L162).
You can also directly log a PipelineML
object in mlflow
programatically:
from pathlib import Path
from kedro.framework.context import load_context
from kedro_mlflow.mlflow import KedroPipelineModel
from mlflow.models import ModelSignature
# pipeline_training is your PipelineML object, created as previsously
catalog = load_context(".").io
# artifacts are all the inputs of the inference pipelines that are persisted in the catalog
artifacts = pipeline_training.extract_pipeline_artifacts(catalog)
# get the schema of the input dataset
input_data = catalog.load(pipeline_training.input_name)
model_signature = infer_signature(model_input=input_data)
mlflow.pyfunc.log_model(
artifact_path="model",
python_model=KedroPipelineModel(pipeline=pipeline_training, catalog=catalog),
artifacts=artifacts,
conda_env={"python": "3.10.0", dependencies: ["kedro==0.18.11"]},
signature=model_signature,
)
It is also possible to pass arguments to KedroPipelineModel
to specify the runner or the copy_mode of MemoryDataset
for the inference Pipeline
. This may be faster especially for compiled model (e.g keras, tensorflow...), and more suitable for an API serving pattern. Since kedro-mlflow==0.12.0
, copy_mode="assign"
has become the default.
KedroPipelineModel(pipeline=pipeline_training, catalog=catalog, copy_mode="assign")
Available copy_mode
are assign
, copy
and deepcopy
. It is possible to pass a dictionary to specify different copy mode for each dataset.