forked from Galileo-Galilei/kedro-mlflow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_mlflow_model_local_filesystem_dataset.py
216 lines (176 loc) · 5.97 KB
/
test_mlflow_model_local_filesystem_dataset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
from tempfile import TemporaryDirectory
import mlflow
import pandas as pd
import pytest
from kedro.io import DataCatalog, MemoryDataset
from kedro.pipeline import Pipeline, node
from kedro_datasets.pickle import PickleDataset
from pytest_lazy_fixtures import lf
from sklearn.linear_model import LinearRegression
from kedro_mlflow.io.models import MlflowModelLocalFileSystemDataset
from kedro_mlflow.mlflow import KedroPipelineModel
from kedro_mlflow.pipeline import pipeline_ml_factory
@pytest.fixture
def linreg_model():
linreg_model = LinearRegression()
return linreg_model
@pytest.fixture
def tmp_folder():
tmp_folder = TemporaryDirectory()
return tmp_folder
@pytest.fixture
def linreg_path(tmp_path):
linreg_path = tmp_path / "data" / "06_models" / "linreg"
return linreg_path
@pytest.fixture
def pipeline_ml_obj():
def preprocess_fun(data):
return data
def fit_fun(data):
return 2
def predict_fun(model, data):
return data * model
full_pipeline = Pipeline(
[
node(
func=preprocess_fun,
inputs="raw_data",
outputs="data",
tags=["inference", "training"],
),
node(func=fit_fun, inputs="data", outputs="model", tags=["training"]),
node(
func=predict_fun,
inputs=["data", "model"],
outputs="predictions",
tags=["inference"],
),
]
)
pipeline_ml_obj = pipeline_ml_factory(
training=full_pipeline.only_nodes_with_tags("training"),
inference=full_pipeline.only_nodes_with_tags("inference"),
input_name="raw_data",
)
return pipeline_ml_obj
@pytest.fixture
def pipeline_inference(pipeline_ml_obj):
return pipeline_ml_obj.inference
@pytest.fixture
def dummy_catalog(tmp_path):
dummy_catalog = DataCatalog(
{
"raw_data": MemoryDataset(),
"data": MemoryDataset(),
"model": PickleDataset(
filepath=(tmp_path / "data" / "06_models" / "model.pkl")
.resolve()
.as_posix()
),
}
)
dummy_catalog._datasets["model"].save(2) # emulate model fitting
return dummy_catalog
@pytest.fixture
def kedro_pipeline_model(tmp_path, pipeline_ml_obj, dummy_catalog):
kedro_pipeline_model = KedroPipelineModel(
pipeline=pipeline_ml_obj,
catalog=dummy_catalog,
input_name=pipeline_ml_obj.input_name,
)
return kedro_pipeline_model
def test_save_unversioned_under_same_path(
linreg_path,
linreg_model,
):
model_config = {
"name": "linreg",
"config": {
"type": "kedro_mlflow.io.models.MlflowModelLocalFileSystemDataset",
"flavor": "mlflow.sklearn",
"filepath": linreg_path.as_posix(),
},
}
mlflow_model_ds = MlflowModelLocalFileSystemDataset.from_config(**model_config)
mlflow_model_ds.save(linreg_model)
# check that second save does not fail
# this happens if the underlying folder already exists
mlflow_model_ds.save(linreg_model)
@pytest.mark.parametrize("versioned", [False, True])
def test_save_load_local(linreg_path, linreg_model, versioned):
model_config = {
"name": "linreg",
"config": {
"type": "kedro_mlflow.io.models.MlflowModelLocalFileSystemDataset",
"filepath": linreg_path.as_posix(),
"flavor": "mlflow.sklearn",
"versioned": versioned,
},
}
mlflow_model_ds = MlflowModelLocalFileSystemDataset.from_config(**model_config)
mlflow_model_ds.save(linreg_model)
if versioned:
assert (
linreg_path / mlflow_model_ds._version.save / linreg_path.name
).exists() # Versioned model saved locally
else:
assert linreg_path.exists() # Unversioned model saved locally
linreg_model_loaded = mlflow_model_ds.load()
assert isinstance(linreg_model_loaded, LinearRegression)
@pytest.mark.parametrize(
"pipeline",
[
(lf("pipeline_ml_obj")), # must work for PipelineML
(lf("pipeline_inference")), # must work for Pipeline
],
)
def test_pyfunc_flavor_python_model_save_and_load(
tmp_path, tmp_folder, pipeline, dummy_catalog
):
kedro_pipeline_model = KedroPipelineModel(
pipeline=pipeline,
catalog=dummy_catalog,
input_name="raw_data",
)
artifacts = kedro_pipeline_model.extract_pipeline_artifacts(tmp_folder)
model_config = {
"name": "kedro_pipeline_model",
"config": {
"type": "kedro_mlflow.io.models.MlflowModelLocalFileSystemDataset",
"filepath": (
tmp_path / "data" / "06_models" / "my_custom_model"
).as_posix(),
"flavor": "mlflow.pyfunc",
"pyfunc_workflow": "python_model",
"save_args": {
"artifacts": artifacts,
"conda_env": {"python": "3.10.0", "dependencies": ["kedro==0.18.11"]},
},
},
}
mlflow_model_ds = MlflowModelLocalFileSystemDataset.from_config(**model_config)
mlflow_model_ds.save(kedro_pipeline_model)
assert mlflow.active_run() is None
# close the run, create another dataset and reload
# (emulate a new "kedro run" with the launch of the )
loaded_model = mlflow_model_ds.load()
loaded_model.predict(pd.DataFrame(data=[1], columns=["a"])) == pd.DataFrame(
data=[2], columns=["a"]
)
@pytest.mark.parametrize(
"metadata",
(
None,
{"description": "My awsome dataset"},
{"string": "bbb", "int": 0},
),
)
def test_metrics_history_dataset_with_metadata(metadata):
mlflow_model_ds = MlflowModelLocalFileSystemDataset(
flavor="mlflow.sklearn",
filepath="/my/file/path",
metadata=metadata,
)
assert mlflow_model_ds.metadata == metadata
# Metadata should not show in _describe
assert "metadata" not in mlflow_model_ds._describe()