Skip to content

Commit ce93346

Browse files
Included Prometheus interceptor support for gRPC streaming (#1858)
* Included streaming support for grpc prom server interceptor. * Included prometheus interceptor tests. * Updated docs. * Run grpc interceptor separately.
1 parent 8d91b49 commit ce93346

File tree

10 files changed

+338
-149
lines changed

10 files changed

+338
-149
lines changed

docs/examples/streaming/README.ipynb

+39-20
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
},
4343
{
4444
"cell_type": "code",
45-
"execution_count": 2,
45+
"execution_count": 1,
4646
"metadata": {},
4747
"outputs": [
4848
{
@@ -121,7 +121,7 @@
121121
},
122122
{
123123
"cell_type": "code",
124-
"execution_count": 4,
124+
"execution_count": 2,
125125
"metadata": {},
126126
"outputs": [
127127
{
@@ -138,8 +138,7 @@
138138
"{\n",
139139
" \"debug\": false,\n",
140140
" \"parallel_workers\": 0,\n",
141-
" \"gzip_enabled\": false,\n",
142-
" \"metrics_endpoint\": null\n",
141+
" \"gzip_enabled\": false\n",
143142
"}\n"
144143
]
145144
},
@@ -150,8 +149,7 @@
150149
"Note the currently there are three main limitations of the streaming support in MLServer:\n",
151150
"\n",
152151
"- distributed workers are not supported (i.e., the `parallel_workers` setting should be set to `0`)\n",
153-
"- `gzip` middleware is not supported for REST (i.e., `gzip_enabled` setting should be set to `false`)\n",
154-
"- metrics endpoint is not available (i.e. `metrics_endpoint` is also disabled for streaming for gRPC)"
152+
"- `gzip` middleware is not supported for REST (i.e., `gzip_enabled` setting should be set to `false`)"
155153
]
156154
},
157155
{
@@ -163,7 +161,7 @@
163161
},
164162
{
165163
"cell_type": "code",
166-
"execution_count": 5,
164+
"execution_count": 3,
167165
"metadata": {},
168166
"outputs": [
169167
{
@@ -227,14 +225,14 @@
227225
},
228226
{
229227
"cell_type": "code",
230-
"execution_count": 6,
228+
"execution_count": 4,
231229
"metadata": {},
232230
"outputs": [
233231
{
234232
"name": "stdout",
235233
"output_type": "stream",
236234
"text": [
237-
"Writing generate-request.json\n"
235+
"Overwriting generate-request.json\n"
238236
]
239237
}
240238
],
@@ -272,9 +270,22 @@
272270
},
273271
{
274272
"cell_type": "code",
275-
"execution_count": null,
273+
"execution_count": 5,
276274
"metadata": {},
277-
"outputs": [],
275+
"outputs": [
276+
{
277+
"name": "stdout",
278+
"output_type": "stream",
279+
"text": [
280+
"['What']\n",
281+
"[' is']\n",
282+
"[' the']\n",
283+
"[' capital']\n",
284+
"[' of']\n",
285+
"[' France?']\n"
286+
]
287+
}
288+
],
278289
"source": [
279290
"import httpx\n",
280291
"from httpx_sse import connect_sse\n",
@@ -301,9 +312,22 @@
301312
},
302313
{
303314
"cell_type": "code",
304-
"execution_count": null,
315+
"execution_count": 6,
305316
"metadata": {},
306-
"outputs": [],
317+
"outputs": [
318+
{
319+
"name": "stdout",
320+
"output_type": "stream",
321+
"text": [
322+
"['What']\n",
323+
"[' is']\n",
324+
"[' the']\n",
325+
"[' capital']\n",
326+
"[' of']\n",
327+
"[' France?']\n"
328+
]
329+
}
330+
],
307331
"source": [
308332
"import grpc\n",
309333
"import mlserver.types as types\n",
@@ -315,7 +339,7 @@
315339
"inference_request = types.InferenceRequest.parse_file(\"./generate-request.json\")\n",
316340
"\n",
317341
"# need to convert from string to bytes for grpc\n",
318-
"inference_request.inputs[0] = StringCodec.encode_input(\"prompt\", inference_request.inputs[0].data.__root__)\n",
342+
"inference_request.inputs[0] = StringCodec.encode_input(\"prompt\", inference_request.inputs[0].data.root)\n",
319343
"inference_request_g = converters.ModelInferRequestConverter.from_types(\n",
320344
" inference_request, model_name=\"text-model\", model_version=None\n",
321345
")\n",
@@ -338,11 +362,6 @@
338362
"source": [
339363
"Note that for gRPC, the request is transformed into an async generator which is then passed to the `ModelStreamInfer` method. The response is also an async generator which can be iterated over to get the response."
340364
]
341-
},
342-
{
343-
"cell_type": "markdown",
344-
"metadata": {},
345-
"source": []
346365
}
347366
],
348367
"metadata": {
@@ -361,7 +380,7 @@
361380
"name": "python",
362381
"nbconvert_exporter": "python",
363382
"pygments_lexer": "ipython3",
364-
"version": "3.10.14"
383+
"version": "3.10.12"
365384
}
366385
},
367386
"nbformat": 4,

docs/examples/streaming/README.md

+2-6
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,7 @@ The next step will be to create 2 configuration files:
7878
{
7979
"debug": false,
8080
"parallel_workers": 0,
81-
"gzip_enabled": false,
82-
"metrics_endpoint": null
81+
"gzip_enabled": false
8382
}
8483

8584
```
@@ -88,7 +87,6 @@ Note the currently there are three main limitations of the streaming support in
8887

8988
- distributed workers are not supported (i.e., the `parallel_workers` setting should be set to `0`)
9089
- `gzip` middleware is not supported for REST (i.e., `gzip_enabled` setting should be set to `false`)
91-
- metrics endpoint is not available (i.e. `metrics_endpoint` is also disabled for streaming for gRPC)
9290

9391
#### model-settings.json
9492

@@ -195,7 +193,7 @@ import mlserver.grpc.dataplane_pb2_grpc as dataplane
195193
inference_request = types.InferenceRequest.parse_file("./generate-request.json")
196194

197195
# need to convert from string to bytes for grpc
198-
inference_request.inputs[0] = StringCodec.encode_input("prompt", inference_request.inputs[0].data.__root__)
196+
inference_request.inputs[0] = StringCodec.encode_input("prompt", inference_request.inputs[0].data.root)
199197
inference_request_g = converters.ModelInferRequestConverter.from_types(
200198
inference_request, model_name="text-model", model_version=None
201199
)
@@ -213,5 +211,3 @@ async with grpc.aio.insecure_channel("localhost:8081") as grpc_channel:
213211
```
214212

215213
Note that for gRPC, the request is transformed into an async generator which is then passed to the `ModelStreamInfer` method. The response is also an async generator which can be iterated over to get the response.
216-
217-

docs/examples/streaming/settings.json

+1-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,5 @@
22
{
33
"debug": false,
44
"parallel_workers": 0,
5-
"gzip_enabled": false,
6-
"metrics_endpoint": null
5+
"gzip_enabled": false
76
}

docs/examples/streaming/text_model.py

-13
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,6 @@
77

88
class TextModel(MLModel):
99

10-
async def predict(self, payload: InferenceRequest) -> InferenceResponse:
11-
text = StringCodec.decode_input(payload.inputs[0])[0]
12-
return InferenceResponse(
13-
model_name=self._settings.name,
14-
outputs=[
15-
StringCodec.encode_output(
16-
name="output",
17-
payload=[text],
18-
use_bytes=True,
19-
),
20-
],
21-
)
22-
2310
async def predict_stream(
2411
self, payloads: AsyncIterator[InferenceRequest]
2512
) -> AsyncIterator[InferenceResponse]:

docs/user-guide/streaming.md

-1
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,3 @@ There are three main limitations of the streaming support in MLServer:
3232

3333
- the `parallel_workers` setting should be set to `0` to disable distributed workers (to be addressed in future releases)
3434
- for REST, the `gzip_enabled` setting should be set to `false` to disable GZIP compression, as streaming is not compatible with GZIP compression (see issue [here]( https://github.com/encode/starlette/issues/20#issuecomment-704106436))
35-
- `metrics_endpoint` is also disabled for streaming for gRPC (to be addressed in future releases)

0 commit comments

Comments
 (0)