15
15
import static org .opensearch .ml .common .MLModel .IS_HIDDEN_FIELD ;
16
16
import static org .opensearch .ml .common .MLModel .MODEL_ID_FIELD ;
17
17
import static org .opensearch .ml .common .utils .StringUtils .getErrorMessage ;
18
+ import static org .opensearch .ml .settings .MLCommonsSettings .ML_COMMONS_SAFE_DELETE_WITH_USAGE_CHECK ;
18
19
import static org .opensearch .ml .utils .RestActionUtils .getFetchSourceContext ;
19
20
21
+ import java .util .ArrayDeque ;
22
+ import java .util .ArrayList ;
23
+ import java .util .Arrays ;
24
+ import java .util .Deque ;
25
+ import java .util .List ;
26
+ import java .util .Locale ;
20
27
import java .util .Map ;
21
28
import java .util .Objects ;
29
+ import java .util .concurrent .ConcurrentLinkedQueue ;
22
30
import java .util .concurrent .CountDownLatch ;
23
31
import java .util .concurrent .atomic .AtomicBoolean ;
32
+ import java .util .function .Supplier ;
24
33
34
+ import org .apache .commons .lang3 .tuple .Pair ;
25
35
import org .opensearch .ExceptionsHelper ;
26
36
import org .opensearch .OpenSearchStatusException ;
27
37
import org .opensearch .ResourceNotFoundException ;
28
38
import org .opensearch .action .ActionRequest ;
39
+ import org .opensearch .action .ActionType ;
29
40
import org .opensearch .action .delete .DeleteRequest ;
30
41
import org .opensearch .action .delete .DeleteResponse ;
31
42
import org .opensearch .action .get .GetResponse ;
43
+ import org .opensearch .action .ingest .GetPipelineAction ;
44
+ import org .opensearch .action .ingest .GetPipelineRequest ;
45
+ import org .opensearch .action .search .GetSearchPipelineAction ;
46
+ import org .opensearch .action .search .GetSearchPipelineRequest ;
47
+ import org .opensearch .action .search .SearchRequest ;
32
48
import org .opensearch .action .support .ActionFilters ;
33
49
import org .opensearch .action .support .HandledTransportAction ;
34
50
import org .opensearch .client .Client ;
37
53
import org .opensearch .common .settings .Settings ;
38
54
import org .opensearch .common .util .concurrent .ThreadContext ;
39
55
import org .opensearch .common .xcontent .LoggingDeprecationHandler ;
56
+ import org .opensearch .common .xcontent .XContentHelper ;
57
+ import org .opensearch .common .xcontent .json .JsonXContent ;
40
58
import org .opensearch .commons .authuser .User ;
41
59
import org .opensearch .core .action .ActionListener ;
42
60
import org .opensearch .core .rest .RestStatus ;
43
61
import org .opensearch .core .xcontent .NamedXContentRegistry ;
44
62
import org .opensearch .core .xcontent .XContentParser ;
63
+ import org .opensearch .index .IndexNotFoundException ;
45
64
import org .opensearch .index .query .BoolQueryBuilder ;
46
65
import org .opensearch .index .query .TermQueryBuilder ;
47
66
import org .opensearch .index .query .TermsQueryBuilder ;
54
73
import org .opensearch .ml .common .transport .model .MLModelDeleteAction ;
55
74
import org .opensearch .ml .common .transport .model .MLModelDeleteRequest ;
56
75
import org .opensearch .ml .common .transport .model .MLModelGetRequest ;
76
+ import org .opensearch .ml .engine .utils .AgentModelsSearcher ;
57
77
import org .opensearch .ml .helper .ModelAccessControlHelper ;
58
78
import org .opensearch .ml .settings .MLFeatureEnabledSetting ;
59
79
import org .opensearch .ml .utils .RestActionUtils ;
62
82
import org .opensearch .remote .metadata .client .GetDataObjectRequest ;
63
83
import org .opensearch .remote .metadata .client .SdkClient ;
64
84
import org .opensearch .remote .metadata .common .SdkClientUtils ;
85
+ import org .opensearch .search .SearchHit ;
65
86
import org .opensearch .search .fetch .subphase .FetchSourceContext ;
66
87
import org .opensearch .tasks .Task ;
67
88
import org .opensearch .transport .TransportService ;
@@ -80,6 +101,10 @@ public class DeleteModelTransportAction extends HandledTransportAction<ActionReq
80
101
static final String BULK_FAILURE_MSG = "Bulk failure while deleting model of " ;
81
102
static final String SEARCH_FAILURE_MSG = "Search failure while deleting model of " ;
82
103
static final String OS_STATUS_EXCEPTION_MESSAGE = "Failed to delete all model chunks" ;
104
+ static final String PIPELINE_TARGET_MODEL_KEY = "model_id" ;
105
+
106
+ Boolean isSafeDelete ;
107
+
83
108
final Client client ;
84
109
final SdkClient sdkClient ;
85
110
final NamedXContentRegistry xContentRegistry ;
@@ -90,6 +115,8 @@ public class DeleteModelTransportAction extends HandledTransportAction<ActionReq
90
115
final ModelAccessControlHelper modelAccessControlHelper ;
91
116
private final MLFeatureEnabledSetting mlFeatureEnabledSetting ;
92
117
118
+ final AgentModelsSearcher agentModelsSearcher ;
119
+
93
120
@ Inject
94
121
public DeleteModelTransportAction (
95
122
TransportService transportService ,
@@ -100,6 +127,7 @@ public DeleteModelTransportAction(
100
127
NamedXContentRegistry xContentRegistry ,
101
128
ClusterService clusterService ,
102
129
ModelAccessControlHelper modelAccessControlHelper ,
130
+ AgentModelsSearcher agentModelsSearcher ,
103
131
MLFeatureEnabledSetting mlFeatureEnabledSetting
104
132
) {
105
133
super (MLModelDeleteAction .NAME , transportService , actionFilters , MLModelDeleteRequest ::new );
@@ -108,6 +136,10 @@ public DeleteModelTransportAction(
108
136
this .xContentRegistry = xContentRegistry ;
109
137
this .clusterService = clusterService ;
110
138
this .modelAccessControlHelper = modelAccessControlHelper ;
139
+ this .agentModelsSearcher = agentModelsSearcher ;
140
+ this .settings = settings ;
141
+ isSafeDelete = ML_COMMONS_SAFE_DELETE_WITH_USAGE_CHECK .get (settings );
142
+ clusterService .getClusterSettings ().addSettingsUpdateConsumer (ML_COMMONS_SAFE_DELETE_WITH_USAGE_CHECK , it -> isSafeDelete = it );
111
143
this .mlFeatureEnabledSetting = mlFeatureEnabledSetting ;
112
144
}
113
145
@@ -193,7 +225,19 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<Delete
193
225
)
194
226
);
195
227
} else if (isModelNotDeployed (mlModelState )) {
196
- deleteModel (modelId , tenantId , mlModel .getAlgorithm ().name (), isHidden , actionListener );
228
+ if (isSafeDelete ) {
229
+ // We only check downstream task when it's not hidden and cluster setting is true.
230
+ checkDownstreamTaskBeforeDeleteModel (
231
+ modelId ,
232
+ tenantId ,
233
+ mlModel .getAlgorithm ().name (),
234
+ isHidden ,
235
+ actionListener
236
+ );
237
+ } else {
238
+ deleteModel (modelId , tenantId , mlModel .getAlgorithm ().name (), isHidden , actionListener );
239
+ }
240
+ // deleteModel(modelId, tenantId, mlModel.getAlgorithm().name(), isHidden, actionListener);
197
241
} else {
198
242
wrappedListener
199
243
.onFailure (
@@ -305,6 +349,107 @@ private void deleteModel(
305
349
});
306
350
}
307
351
352
+ private void checkDownstreamTaskBeforeDeleteModel (
353
+ String modelId ,
354
+ String tenantId ,
355
+ String algorithm ,
356
+ Boolean isHidden ,
357
+ ActionListener <DeleteResponse > actionListener
358
+ ) {
359
+ // Now checks 3 resources associated with the model id 1. Agent 2. Search pipeline 3. ingest pipeline
360
+ CountDownLatch countDownLatch = new CountDownLatch (3 );
361
+ AtomicBoolean noneBlocked = new AtomicBoolean (true );
362
+ ConcurrentLinkedQueue <String > errorMessages = new ConcurrentLinkedQueue <>();
363
+ ActionListener <Boolean > countDownActionListener = ActionListener .wrap (b -> {
364
+ countDownLatch .countDown ();
365
+ noneBlocked .compareAndSet (true , b );
366
+ if (countDownLatch .getCount () == 0 ) {
367
+ if (noneBlocked .get ()) {
368
+ deleteModel (modelId , tenantId , algorithm , isHidden , actionListener );
369
+ }
370
+ }
371
+ }, e -> {
372
+ countDownLatch .countDown ();
373
+ noneBlocked .set (false );
374
+ errorMessages .add (e .getMessage ());
375
+ actionListener .onFailure (new OpenSearchStatusException (e .getMessage (), RestStatus .CONFLICT ));
376
+
377
+ });
378
+ checkAgentBeforeDeleteModel (modelId , countDownActionListener );
379
+ checkIngestPipelineBeforeDeleteModel (modelId , countDownActionListener );
380
+ checkSearchPipelineBeforeDeleteModel (modelId , countDownActionListener );
381
+ }
382
+
383
+ private void checkAgentBeforeDeleteModel (String modelId , ActionListener <Boolean > actionListener ) {
384
+ // check whether agent are using them
385
+ SearchRequest searchAgentRequest = agentModelsSearcher .constructQueryRequestToSearchModelIdInsideAgent (modelId );
386
+ client .search (searchAgentRequest , ActionListener .wrap (searchResponse -> {
387
+ SearchHit [] searchHits = searchResponse .getHits ().getHits ();
388
+ if (searchHits .length == 0 ) {
389
+ actionListener .onResponse (true );
390
+ } else {
391
+ String errorMessage = formatAgentErrorMessage (searchHits );
392
+ actionListener .onFailure (new OpenSearchStatusException (errorMessage , RestStatus .CONFLICT ));
393
+ }
394
+
395
+ }, e -> {
396
+ if (e instanceof IndexNotFoundException ) {
397
+ actionListener .onResponse (true );
398
+ return ;
399
+ }
400
+ log .error ("Failed to delete ML Model: " + modelId , e );
401
+ actionListener .onFailure (e );
402
+
403
+ }));
404
+ }
405
+
406
+ private void checkIngestPipelineBeforeDeleteModel (String modelId , ActionListener <Boolean > actionListener ) {
407
+ checkPipelineBeforeDeleteModel (modelId , actionListener , "ingest" , GetPipelineRequest ::new , GetPipelineAction .INSTANCE );
408
+
409
+ }
410
+
411
+ private void checkSearchPipelineBeforeDeleteModel (String modelId , ActionListener <Boolean > actionListener ) {
412
+ checkPipelineBeforeDeleteModel (modelId , actionListener , "search" , GetSearchPipelineRequest ::new , GetSearchPipelineAction .INSTANCE );
413
+
414
+ }
415
+
416
+ private void checkPipelineBeforeDeleteModel (
417
+ String modelId ,
418
+ ActionListener <Boolean > actionListener ,
419
+ String pipelineType ,
420
+ Supplier <ActionRequest > requestSupplier ,
421
+ ActionType actionType
422
+ ) {
423
+ ActionRequest request = requestSupplier .get ();
424
+ client .execute (actionType , request , ActionListener .wrap (pipelineResponse -> {
425
+ Map <String , Object > allConfigMap = XContentHelper .convertToMap (JsonXContent .jsonXContent , pipelineResponse .toString (), true );
426
+ List <String > allDependentPipelineIds = findDependentPipelinesEasy (allConfigMap , modelId );
427
+ if (allDependentPipelineIds .isEmpty ()) {
428
+ actionListener .onResponse (true );
429
+ } else {
430
+ actionListener
431
+ .onFailure (
432
+ new OpenSearchStatusException (
433
+ String
434
+ .format (
435
+ Locale .ROOT ,
436
+ "%d %s pipelines are still using this model, please delete or update the pipelines first: %s" ,
437
+ allDependentPipelineIds .size (),
438
+ pipelineType ,
439
+ Arrays .toString (allDependentPipelineIds .toArray (new String [0 ]))
440
+ ),
441
+ RestStatus .CONFLICT
442
+ )
443
+ );
444
+ }
445
+ }, e -> {
446
+ log .error ("Failed to delete ML Model: " + modelId , e );
447
+ actionListener .onFailure (e );
448
+
449
+ }));
450
+
451
+ }
452
+
308
453
private void deleteModelChunksAndController (
309
454
ActionListener <DeleteResponse > actionListener ,
310
455
String modelId ,
@@ -410,6 +555,71 @@ private Boolean isModelNotDeployed(MLModelState mlModelState) {
410
555
&& !mlModelState .equals (MLModelState .PARTIALLY_DEPLOYED );
411
556
}
412
557
558
+ private List <String > findDependentPipelinesEasy (Map <String , Object > allConfigMap , String candidateModelId ) {
559
+ List <String > dependentPipelineConfigurations = new ArrayList <>();
560
+ for (Map .Entry <String , Object > entry : allConfigMap .entrySet ()) {
561
+ String id = entry .getKey ();
562
+ Map <String , Object > config = (Map <String , Object >) entry .getValue ();
563
+ if (searchThroughConfig (config , candidateModelId )) {
564
+ dependentPipelineConfigurations .add (id );
565
+ }
566
+ }
567
+ return dependentPipelineConfigurations ;
568
+ }
569
+
570
+ // This method is to go through the pipeline configs and the configuration is a map of string to objects.
571
+ // Objects can be a list or a map. we will search exhaustively through the configuration for any match of the candidateId.
572
+ private Boolean searchThroughConfig (Object searchCandidate , String candidateId ) {
573
+ // Use a stack to store the elements to be processed
574
+ Deque <Pair <String , Object >> stack = new ArrayDeque <>();
575
+ stack .push (Pair .of ("" , searchCandidate ));
576
+
577
+ while (!stack .isEmpty ()) {
578
+ // Pop an item from the stack
579
+ Pair <String , Object > current = stack .pop ();
580
+ String currentKey = current .getLeft ();
581
+ Object currentCandidate = current .getRight ();
582
+
583
+ if (currentCandidate instanceof String && candidateId .equals (currentCandidate )) {
584
+ // Check for a match
585
+ if (PIPELINE_TARGET_MODEL_KEY .equals (currentKey )) {
586
+ return true ;
587
+ }
588
+ } else if (currentCandidate instanceof List <?>) {
589
+ // Push all elements in the list onto the stack
590
+ for (Object v : (List <?>) currentCandidate ) {
591
+ stack .push (Pair .of (currentKey , v ));
592
+ }
593
+ } else if (currentCandidate instanceof Map <?, ?>) {
594
+ // Push all values in the map onto the stack
595
+ for (Map .Entry <?, ?> entry : ((Map <?, ?>) currentCandidate ).entrySet ()) {
596
+ String key = (String ) entry .getKey ();
597
+ Object value = entry .getValue ();
598
+ stack .push (Pair .of (key , value ));
599
+ }
600
+ }
601
+ }
602
+
603
+ // If no match is found
604
+ return false ;
605
+ }
606
+
607
+ private String formatAgentErrorMessage (SearchHit [] hits ) {
608
+ List <String > agentIds = new ArrayList <>();
609
+ for (SearchHit hit : hits ) {
610
+ Map <String , Object > sourceAsMap = hit .getSourceAsMap ();
611
+ agentIds .add (hit .getId ());
612
+ }
613
+ return String
614
+ .format (
615
+ Locale .ROOT ,
616
+ "%d agents are still using this model, please delete or update the agents first, all visible agents are: %s" ,
617
+ hits .length ,
618
+ Arrays .toString (agentIds .toArray (new String [0 ]))
619
+ );
620
+
621
+ }
622
+
413
623
// this method is only to stub static method.
414
624
@ VisibleForTesting
415
625
boolean isSuperAdminUserWrapper (ClusterService clusterService , Client client ) {
0 commit comments