From bf4a370dec4aa13a4a5b84427e6d2d0a3f1928d6 Mon Sep 17 00:00:00 2001 From: matthew Date: Mon, 27 Jan 2025 11:33:37 +0800 Subject: [PATCH 1/5] [ISSUE #9097]Add new command to check async task status in broker. --- .../broker/AdminAsyncTaskManager.java | 129 ++++++ .../processor/AdminBrokerProcessor.java | 49 ++- .../rocketmq/client/impl/MQClientAPIImpl.java | 15 + .../org/apache/rocketmq/common/AsyncTask.java | 98 +++++ .../apache/rocketmq/common/TaskStatus.java | 46 +++ .../remoting/protocol/RequestCode.java | 1 + .../CheckAsyncTaskStatusRequestHeader.java | 73 ++++ .../CheckAsyncTaskStatusResponseHeader.java | 68 ++++ .../tools/admin/DefaultMQAdminExt.java | 7 + .../tools/admin/DefaultMQAdminExtImpl.java | 7 + .../rocketmq/tools/admin/MQAdminExt.java | 4 + .../stats/CheckAsyncTaskStatusSubCommand.java | 190 +++++++++ .../CheckAsyncTaskStatusSubCommandTest.java | 373 ++++++++++++++++++ 13 files changed, 1054 insertions(+), 6 deletions(-) create mode 100644 broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/AsyncTask.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/TaskStatus.java create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusResponseHeader.java create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java create mode 100644 tools/src/test/java/org/apache/rocketmq/tools/command/CheckAsyncTaskStatusSubCommandTest.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java b/broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java new file mode 100644 index 00000000000..0d1726679bf --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker; + +import com.alibaba.fastjson.JSON; +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.common.AsyncTask; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.common.TaskStatus; + +public class AdminAsyncTaskManager { + + // taskId -> AsyncTask + private final Cache asyncTaskCache; + + // taskName -> taskId + private final ConcurrentHashMap> taskNameToIdsMap; + + public AdminAsyncTaskManager() { + this.asyncTaskCache = Caffeine.newBuilder() + .expireAfterWrite(30, TimeUnit.MINUTES) + .maximumSize(10000) + .build(); + + this.taskNameToIdsMap = new ConcurrentHashMap<>(); + } + + /** + * Creates a new asynchronous task with a unique taskId. + * + * @param taskName The name of the task. + * @param future The CompletableFuture representing the asynchronous task. + * @return The generated taskId. + */ + public String createTask(String taskName, CompletableFuture future) { + String taskId = UUID.randomUUID().toString(); + AsyncTask task = new AsyncTask(taskName, taskId, future); + + asyncTaskCache.put(taskId, task); + taskNameToIdsMap.computeIfAbsent(taskName, k -> new ArrayList<>()).add(taskId); + + future.whenComplete((result, throwable) -> { + if (throwable != null) { + task.setStatus(TaskStatus.ERROR.getValue()); + task.setResult(throwable.getMessage()); + } else { + task.setStatus(TaskStatus.SUCCESS.getValue()); + task.setResult(JSON.toJSONString(result)); + } + }); + + return taskId; + } + + /** + * Get all taskIds associated with a given task name. + * + * @param taskName The name of the task. + * @return List of taskIds for the given task name. + */ + public List getTaskIdsByName(String taskName) { + return taskNameToIdsMap.getOrDefault(taskName, Collections.emptyList()); + } + + /** + * Get the status of a specific task. + * + * @param taskId The unique identifier of the task. + * @return The AsyncTask object, or null if not found. + */ + public AsyncTask getTaskStatus(String taskId) { + return asyncTaskCache.getIfPresent(taskId); + } + + /** + * Update the status and result of a specific task. + * + * @param taskId The unique identifier of the task. + * @param status The new status of the task. + * @param result The result of the task. + */ + public void updateTaskStatus(String taskId, int status, String result) { + AsyncTask task = asyncTaskCache.getIfPresent(taskId); + if (task != null) { + task.setStatus(status); + task.setResult(result); + asyncTaskCache.put(taskId, task); + } + } + + /** + * Remove a specific task from the cache and mappings. + * + * @param taskId The unique identifier of the task. + */ + public void removeTask(String taskId) { + AsyncTask task = asyncTaskCache.getIfPresent(taskId); + if (task != null) { + asyncTaskCache.invalidate(taskId); + taskNameToIdsMap.computeIfPresent(task.getTaskName(), (k, v) -> { + v.remove(taskId); + return v.isEmpty() ? null : v; + }); + } + } +} \ No newline at end of file diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index a9b913192fa..40a39119ee4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -43,6 +43,7 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.acl.AccessValidator; @@ -56,6 +57,7 @@ import org.apache.rocketmq.auth.authorization.exception.AuthorizationException; import org.apache.rocketmq.auth.authorization.model.Acl; import org.apache.rocketmq.auth.authorization.model.Resource; +import org.apache.rocketmq.broker.AdminAsyncTaskManager; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.auth.converter.AclConverter; import org.apache.rocketmq.broker.auth.converter.UserConverter; @@ -72,6 +74,7 @@ import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; +import org.apache.rocketmq.common.AsyncTask; import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; @@ -149,6 +152,8 @@ import org.apache.rocketmq.remoting.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.body.UserInfo; +import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusResponseHeader; import org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CloneGroupOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader; @@ -245,9 +250,11 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { protected final BrokerController brokerController; protected Set configBlackList = new HashSet<>(); private final ExecutorService asyncExecuteWorker = new ThreadPoolExecutor(0, 4, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); + private final AdminAsyncTaskManager asyncTaskManager; public AdminBrokerProcessor(final BrokerController brokerController) { this.brokerController = brokerController; + this.asyncTaskManager = new AdminAsyncTaskManager(); initConfigBlackList(); } @@ -415,6 +422,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, return this.listAcl(ctx, request); case RequestCode.POP_ROLLBACK: return this.transferPopToFsStore(ctx, request); + case RequestCode.CHECK_ASYNC_TASK_STATUS: + return this.checkAsyncTaskStatus(ctx, request); default: return getUnknownCmdResponse(ctx, request); } @@ -487,15 +496,16 @@ private RemotingCommand updateAndGetGroupForbidden(ChannelHandlerContext ctx, Re private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) { CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult(); result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_IN_PROGRESS.getValue()); - Runnable runnable = () -> { + + CompletableFuture future = CompletableFuture.supplyAsync(() -> { try { - CheckRocksdbCqWriteResult checkResult = doCheckRocksdbCqWriteProgress(ctx, request); - LOGGER.info("checkRocksdbCqWriteProgress result: {}", JSON.toJSONString(checkResult)); + return doCheckRocksdbCqWriteProgress(ctx, request); } catch (Exception e) { - LOGGER.error("checkRocksdbCqWriteProgress error", e); + throw new CompletionException(e); } - }; - asyncExecuteWorker.submit(runnable); + }, asyncExecuteWorker); + + asyncTaskManager.createTask("checkRocksdbCqWriteProgress", future); RemotingCommand response = RemotingCommand.createResponseCommand(null); response.setCode(ResponseCode.SUCCESS); response.setBody(JSON.toJSONBytes(result)); @@ -3597,4 +3607,31 @@ private RemotingCommand transferPopToFsStore(ChannelHandlerContext ctx, Remoting } return response; } + + private RemotingCommand checkAsyncTaskStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final CheckAsyncTaskStatusRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckAsyncTaskStatusRequestHeader.class); + List taskIds = asyncTaskManager.getTaskIdsByName(requestHeader.getTaskName()); + if (CollectionUtils.isEmpty(taskIds)) { + throw new RemotingCommandException("taskName: " + requestHeader.getTaskName() + " not found"); + } + + try { + int maxResults = Math.min(requestHeader.getMaxLimit(), 200); + Integer filterStatus = requestHeader.getTaskStatus(); + + List asyncTasks = taskIds.stream() + .map(asyncTaskManager::getTaskStatus) + .filter(task -> filterStatus == null || task.getStatus() == filterStatus) + .limit(maxResults) + .collect(Collectors.toList()); + + RemotingCommand response = RemotingCommand.createResponseCommand(CheckAsyncTaskStatusResponseHeader.class); + response.setCode(ResponseCode.SUCCESS); + response.setBody(JSON.toJSONBytes(asyncTasks)); + return response; + } catch (Exception e) { + LOGGER.error("checkAsyncTaskStatus error", e); + return RemotingCommand.createResponseCommand(ResponseCode.SYSTEM_ERROR, e.getMessage()); + } + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 114093e3502..547015df5ed 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -55,6 +55,7 @@ import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.rpchook.NamespaceRpcHook; +import org.apache.rocketmq.common.AsyncTask; import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; import org.apache.rocketmq.common.MQVersion; @@ -149,6 +150,7 @@ import org.apache.rocketmq.remoting.protocol.header.AddBrokerRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CloneGroupOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader; @@ -3601,4 +3603,17 @@ public void exportPopRecord(String brokerAddr, long timeout) throws RemotingConn } throw new MQBrokerException(response.getCode(), response.getRemark()); } + + public List checkAsyncTaskStatus(String brokerAddr, CheckAsyncTaskStatusRequestHeader requestHeader, + long timeoutMillis) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_ASYNC_TASK_STATUS, + requestHeader); + RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, timeoutMillis); + assert response != null; + if (response.getCode() == SUCCESS) { + return RemotingSerializable.decodeList(response.getBody(), AsyncTask.class); + } + throw new MQBrokerException(response.getCode(), response.getRemark()); + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/AsyncTask.java b/common/src/main/java/org/apache/rocketmq/common/AsyncTask.java new file mode 100644 index 00000000000..674989fda08 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/AsyncTask.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common; + +import java.util.Date; +import java.util.concurrent.CompletableFuture; + +public class AsyncTask { + + private String taskName; + + private String taskId; + + private int status; + + private Date createTime; + + private String result; + + private final CompletableFuture future; + + public AsyncTask(String taskName, String taskId, CompletableFuture future) { + this.taskName = taskName; + this.taskId = taskId; + this.status = TaskStatus.INIT.getValue(); + this.createTime = new Date(); + this.result = null; + this.future = future; + } + + public String getTaskName() { + return taskName; + } + + public void setTaskName(String taskName) { + this.taskName = taskName; + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + public String getResult() { + return result; + } + + public void setResult(String result) { + this.result = result; + } + + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + + public String getTaskId() { + return taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } + + public CompletableFuture getFuture() { + return future; + } + + public static String getDescFromStatus(int status) { + for (TaskStatus taskStatus : TaskStatus.values()) { + if (taskStatus.getValue() == status) { + return taskStatus.getDesc(); + } + } + return "unknown"; + } +} \ No newline at end of file diff --git a/common/src/main/java/org/apache/rocketmq/common/TaskStatus.java b/common/src/main/java/org/apache/rocketmq/common/TaskStatus.java new file mode 100644 index 00000000000..2f542cd035e --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/TaskStatus.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common; + +public enum TaskStatus { + + INIT(0, "Initialized"), + + IN_PROGRESS(1, "In Progress"), + + ERROR(2, "Error"), + + SUCCESS(3, "Success"); + + private final int value; + + private final String desc; + + TaskStatus(int value, String desc) { + this.value = value; + this.desc = desc; + } + + public int getValue() { + return value; + } + + public String getDesc() { + return desc; + } +} \ No newline at end of file diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java index e3b180a5379..2d2f8778cd9 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java @@ -220,6 +220,7 @@ public class RequestCode { public static final int UPDATE_AND_GET_GROUP_FORBIDDEN = 353; public static final int CHECK_ROCKSDB_CQ_WRITE_PROGRESS = 354; public static final int EXPORT_ROCKSDB_CONFIG_TO_JSON = 355; + public static final int CHECK_ASYNC_TASK_STATUS = 356; public static final int LITE_PULL_MESSAGE = 361; public static final int RECALL_MESSAGE = 370; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java new file mode 100644 index 00000000000..fa6779f78a7 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.protocol.header; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.action.Action; +import org.apache.rocketmq.common.action.RocketMQAction; +import org.apache.rocketmq.common.resource.ResourceType; +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.RequestCode; + +@RocketMQAction(value = RequestCode.CHECK_ASYNC_TASK_STATUS, resource = ResourceType.CLUSTER, action = Action.GET) +public class CheckAsyncTaskStatusRequestHeader implements CommandCustomHeader { + + private String taskName; + + private int maxLimit; // Optional parameter for filtering return tasks nums. + + private Integer taskStatus; // Optional parameter for filtering tasks with specific statuses + + @Override + public void checkFields() throws RemotingCommandException { + if (StringUtils.isBlank(taskName)) { + throw new RemotingCommandException("taskName cannot be null or blank"); + } + if (maxLimit <= 0) { + throw new RemotingCommandException("maxLimit must be greater than 0"); + } + if (taskStatus != null && (taskStatus < 0 || taskStatus > 3)) { + throw new RemotingCommandException("taskStatus must be between 0 and 3"); + } + } + + public String getTaskName() { + return taskName; + } + + public void setTaskName(String taskName) { + this.taskName = taskName; + } + + public int getMaxLimit() { + return maxLimit; + } + + public void setMaxLimit(int maxLimit) { + this.maxLimit = maxLimit; + } + + public Integer getTaskStatus() { + return taskStatus; + } + + public void setTaskStatus(Integer taskStatus) { + this.taskStatus = taskStatus; + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusResponseHeader.java new file mode 100644 index 00000000000..813a0717562 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusResponseHeader.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.protocol.header; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class CheckAsyncTaskStatusResponseHeader implements CommandCustomHeader { + + private String taskId; + + private String status; + + private int progress; + + private String result; + + @Override + public void checkFields() throws RemotingCommandException { + + } + + public String getTaskId() { + return taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public int getProgress() { + return progress; + } + + public void setProgress(int progress) { + this.progress = progress; + } + + public String getResult() { + return result; + } + + public void setResult(String result) { + this.result = result; + } +} \ No newline at end of file diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index f224f749cbc..30918395ec7 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -25,6 +25,7 @@ import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.AsyncTask; import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; import org.apache.rocketmq.common.Pair; @@ -66,6 +67,7 @@ import org.apache.rocketmq.remoting.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.UserInfo; import org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; @@ -1018,4 +1020,9 @@ public void exportPopRecords(String brokerAddr, long timeout) throws RemotingCon RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { defaultMQAdminExtImpl.exportPopRecords(brokerAddr, timeout); } + + public List checkAsyncTaskStatus(String brokerAddr, CheckAsyncTaskStatusRequestHeader requestHeader) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + return defaultMQAdminExtImpl.checkAsyncTaskStatus(brokerAddr, requestHeader); + } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 5be99606dc8..69f36ad42ab 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -45,6 +45,7 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.AsyncTask; import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; import org.apache.rocketmq.common.KeyBuilder; @@ -104,6 +105,7 @@ import org.apache.rocketmq.remoting.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.UserInfo; import org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateGroupForbiddenRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; @@ -2099,4 +2101,9 @@ public void exportPopRecords(String brokerAddr, long timeout) throws RemotingCon RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { this.mqClientInstance.getMQClientAPIImpl().exportPopRecord(brokerAddr, timeout); } + + public List checkAsyncTaskStatus(String brokerAddr, CheckAsyncTaskStatusRequestHeader requestHeader) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + return this.mqClientInstance.getMQClientAPIImpl().checkAsyncTaskStatus(brokerAddr, requestHeader, timeoutMillis); + } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 2f01b6cba81..294c462b070 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -24,6 +24,7 @@ import org.apache.rocketmq.client.MQAdmin; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.AsyncTask; import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.PlainAccessConfig; @@ -62,6 +63,7 @@ import org.apache.rocketmq.remoting.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.UserInfo; import org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; @@ -534,4 +536,6 @@ String setCommitLogReadAheadMode(final String brokerAddr, String mode) void exportPopRecords(String brokerAddr, long timeout) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException; + + List checkAsyncTaskStatus(String brokerAddr, CheckAsyncTaskStatusRequestHeader requestHeader)throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException; } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java new file mode 100644 index 00000000000..1800e2ab93a --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.tools.command.stats; + +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.collections.CollectionUtils; +import org.apache.rocketmq.common.AsyncTask; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusRequestHeader; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.SubCommand; + +import java.util.List; +import java.util.Map; + +public class CheckAsyncTaskStatusSubCommand implements SubCommand { + + private DefaultMQAdminExt defaultMQAdminExt; + + private static final int DEFAULT_MAX_TASKS = 20; + + private DefaultMQAdminExt createDefaultMQAdminExt(RPCHook rpcHook) { + if (defaultMQAdminExt == null) { + defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + } else { + return defaultMQAdminExt; + } + return defaultMQAdminExt; + } + + @Override + public String commandName() { + return "checkAsyncTaskStatus"; + } + + @Override + public String commandDesc() { + return "Check the status of an asynchronous task by task name."; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("t", "taskName", true, "The name of the asynchronous task"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("b", "brokerAddr", true, "Check which broker"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("c", "clusterName", true, "Check which cluster"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("n", "nameSrvAddr", true, "Nameserver address"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("m", "maxLimit", true, "Maximum number of tasks to return"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("s", "taskStatus", true, "Filter tasks by status (0 for init, 1 for in progress, 2 for error, 3 for success.)"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + defaultMQAdminExt = createDefaultMQAdminExt(rpcHook); + + String taskName = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : null; + if (taskName == null) { + System.out.print("Task name cannot be empty. Please specify a task name with -t."); + return; + } + String brokerAddr = commandLine.hasOption('b') ? commandLine.getOptionValue('b').trim() : null; + String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : null; + String namesAddr = commandLine.hasOption('n') ? commandLine.getOptionValue('n').trim() : null; + int maxLimit = commandLine.hasOption('m') ? Integer.parseInt(commandLine.getOptionValue('m').trim()) : DEFAULT_MAX_TASKS; + Integer taskStatus = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s').trim()) : null; + + try { + defaultMQAdminExt.start(); + + if (namesAddr != null) { + defaultMQAdminExt.setNamesrvAddr(namesAddr); + } + + ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); + if (clusterInfo == null) { + System.out.print("Cluster info is empty."); + return; + } + Map brokerAddrTable = clusterInfo.getBrokerAddrTable(); + Map> clusterAddrTable = clusterInfo.getClusterAddrTable(); + + if (brokerAddr != null) { + // If brokerAddr is specified, check that broker first. + if (brokerAddrTable == null || brokerAddrTable.isEmpty()) { + System.out.print("Broker address table is empty."); + return; + } + BrokerData brokerData = brokerAddrTable.values().stream() + .filter(b -> b.getBrokerAddrs().containsValue(brokerAddr)) + .findFirst() + .orElse(null); + + if (brokerData != null) { + checkAsyncTaskStatusOnBroker(brokerAddr, taskName, brokerData.getBrokerName(), maxLimit, taskStatus); + } else { + System.out.printf("Broker with address '%s' not found.%n", brokerAddr); + } + } else if (clusterName != null) { + // If brokerAddr is not specified but clusterName is specified, check all brokers in the cluster. + Set brokerNames = clusterAddrTable.get(clusterName); + if (brokerNames == null || brokerNames.isEmpty()) { + System.out.printf("Cluster '%s' not found or has no brokers.%n", clusterName); + return; + } + brokerNames.forEach(brokerName -> { + BrokerData brokerData = brokerAddrTable.get(brokerName); + if (brokerData != null) { + checkAsyncTaskStatusOnBroker(brokerData.selectBrokerAddr(), taskName, brokerName, maxLimit, taskStatus); + } + }); + } else { + // If neither brokerAddr nor clusterName is specified, check all brokers. + for (Map.Entry entry : brokerAddrTable.entrySet()) { + String brokerName = entry.getKey(); + String addr = entry.getValue().selectBrokerAddr(); + checkAsyncTaskStatusOnBroker(addr, taskName, brokerName, maxLimit, taskStatus); + } + } + } catch (Exception e) { + throw new RuntimeException("Failed to execute " + this.getClass().getSimpleName() + " command", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } + + private void checkAsyncTaskStatusOnBroker(String brokerAddr, String taskName, String brokerName, int maxLimit, Integer taskStatus) { + try { + CheckAsyncTaskStatusRequestHeader requestHeader = new CheckAsyncTaskStatusRequestHeader(); + requestHeader.setTaskName(taskName); + requestHeader.setMaxLimit(maxLimit); + requestHeader.setTaskStatus(taskStatus); + + List asyncTaskStatus = defaultMQAdminExt.checkAsyncTaskStatus(brokerAddr, requestHeader); + + if (CollectionUtils.isNotEmpty(asyncTaskStatus)) { + for (AsyncTask task : asyncTaskStatus) { + System.out.printf( + "Task found for task name '%s' on broker %s: [Task ID: %s, Status: %s, Result: %s, Create Time: %s]%n", + taskName, brokerName, task.getTaskId(), AsyncTask.getDescFromStatus(task.getStatus()), + task.getResult(), task.getCreateTime().toString() + ); + } + } else { + System.out.printf("No tasks found for task name '%s' on broker %s.%n", taskName, brokerName); + } + } catch (Exception e) { + System.out.printf("Failed to query task status for task name '%s' on broker %s: %s%n", + taskName, brokerName, e.getMessage()); + } + } +} diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/CheckAsyncTaskStatusSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/CheckAsyncTaskStatusSubCommandTest.java new file mode 100644 index 00000000000..5a4c881e179 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/CheckAsyncTaskStatusSubCommandTest.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.tools.command; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.common.AsyncTask; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusRequestHeader; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.server.ServerResponseMocker; +import org.apache.rocketmq.tools.command.stats.CheckAsyncTaskStatusSubCommand; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.CompletableFuture; + + +public class CheckAsyncTaskStatusSubCommandTest extends ServerResponseMocker { + + @Mock + private DefaultMQAdminExt defaultMQAdminExt; + + @Mock + private CommandLine commandLine; + + @Mock + private RPCHook rpcHook; + + private CheckAsyncTaskStatusSubCommand checkAsyncTaskStatusSubCommand; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + checkAsyncTaskStatusSubCommand = new CheckAsyncTaskStatusSubCommand(); + + Field field = CheckAsyncTaskStatusSubCommand.class.getDeclaredField("defaultMQAdminExt"); + field.setAccessible(true); + field.set(checkAsyncTaskStatusSubCommand, defaultMQAdminExt); + } + + @Override + protected byte[] getBody() { + return new byte[0]; + } + + @Test + public void testExecute_Success() throws Exception { + String clusterName = "testCluster"; + String taskName = "testTask"; + String brokerName = "brokerA"; + String brokerAddr = "127.0.0.1:" + listenPort(); + Integer taskStatus = 1; + + when(commandLine.hasOption('c')).thenReturn(true); + when(commandLine.getOptionValue('c')).thenReturn(clusterName); + when(commandLine.hasOption('t')).thenReturn(true); + when(commandLine.getOptionValue('t')).thenReturn(taskName); + when(commandLine.hasOption('s')).thenReturn(true); + when(commandLine.getOptionValue('s')).thenReturn(String.valueOf(taskStatus)); + + ClusterInfo clusterInfo = new ClusterInfo(); + Map> clusterAddrTable = new HashMap<>(); + clusterAddrTable.put(clusterName, new HashSet<>(Collections.singletonList(brokerName))); + clusterInfo.setClusterAddrTable(clusterAddrTable); + + BrokerData brokerData = spy(new BrokerData()); + brokerData.setBrokerAddrs(new HashMap<>()); + brokerData.getBrokerAddrs().put(0L, brokerAddr); + when(brokerData.selectBrokerAddr()).thenReturn(brokerAddr); + Map brokerAddrTable = new HashMap<>(); + brokerAddrTable.put(brokerName, brokerData); + clusterInfo.setBrokerAddrTable(brokerAddrTable); + + when(defaultMQAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo); + + CompletableFuture future = new CompletableFuture<>(); + AsyncTask asyncTask = new AsyncTask("testTask", "122421", future); + asyncTask.setTaskId("taskId"); + asyncTask.setStatus(1); + asyncTask.setResult("Task completed successfully"); + asyncTask.setCreateTime(new Date()); + + checkAsyncTaskStatusSubCommand.execute(commandLine, new Options(), rpcHook); + + verify(defaultMQAdminExt).start(); + verify(defaultMQAdminExt).examineBrokerClusterInfo(); + when(defaultMQAdminExt.checkAsyncTaskStatus(eq(brokerAddr), any(CheckAsyncTaskStatusRequestHeader.class))) + .thenReturn(Collections.singletonList(asyncTask)); + verify(defaultMQAdminExt).shutdown(); + } + + @Test + public void testExecute_EmptyTaskName() throws Exception { + when(commandLine.hasOption('t')).thenReturn(false); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + PrintStream originalOut = System.out; + System.setOut(new PrintStream(outputStream)); + + checkAsyncTaskStatusSubCommand.execute(commandLine, new Options(), rpcHook); + + System.setOut(originalOut); + + String output = outputStream.toString().trim(); + assertEquals("Task name cannot be empty. Please specify a task name with -t.", output); + + verify(defaultMQAdminExt, never()).start(); + verify(defaultMQAdminExt, never()).examineBrokerClusterInfo(); + verify(defaultMQAdminExt, never()).checkAsyncTaskStatus(anyString(), any(CheckAsyncTaskStatusRequestHeader.class)); + verify(defaultMQAdminExt, never()).shutdown(); + } + + @Test + public void testExecute_WithBrokerAddr() throws Exception { + String brokerAddr = "127.0.0.1:" + listenPort(); + String taskName = "testTask"; + + when(commandLine.hasOption('b')).thenReturn(true); + when(commandLine.getOptionValue('b')).thenReturn(brokerAddr); + when(commandLine.hasOption('t')).thenReturn(true); + when(commandLine.getOptionValue('t')).thenReturn(taskName); + + ClusterInfo clusterInfo = new ClusterInfo(); + Map brokerAddrTable = new HashMap<>(); + BrokerData brokerData = spy(new BrokerData()); + brokerData.setBrokerAddrs(new HashMap<>()); + brokerData.getBrokerAddrs().put(0L, brokerAddr); + when(brokerData.selectBrokerAddr()).thenReturn(brokerAddr); + brokerAddrTable.put("brokerA", brokerData); + clusterInfo.setBrokerAddrTable(brokerAddrTable); + + when(defaultMQAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo); + + AsyncTask asyncTask = new AsyncTask(taskName, "taskId", new CompletableFuture<>()); + when(defaultMQAdminExt.checkAsyncTaskStatus(eq(brokerAddr), any(CheckAsyncTaskStatusRequestHeader.class))) + .thenReturn(Collections.singletonList(asyncTask)); + + checkAsyncTaskStatusSubCommand.execute(commandLine, new Options(), rpcHook); + + verify(defaultMQAdminExt).start(); + verify(defaultMQAdminExt).examineBrokerClusterInfo(); + verify(defaultMQAdminExt).checkAsyncTaskStatus(eq(brokerAddr), any(CheckAsyncTaskStatusRequestHeader.class)); + verify(defaultMQAdminExt).shutdown(); + } + + @Test + public void testExecute_WithMaxLimit() throws Exception { + String taskName = "testTask"; + int maxLimit = 5; + + when(commandLine.hasOption('t')).thenReturn(true); + when(commandLine.getOptionValue('t')).thenReturn(taskName); + when(commandLine.hasOption('m')).thenReturn(true); + when(commandLine.getOptionValue('m')).thenReturn(String.valueOf(maxLimit)); + + ClusterInfo clusterInfo = new ClusterInfo(); + Map brokerAddrTable = new HashMap<>(); + BrokerData brokerData = spy(new BrokerData()); + brokerData.setBrokerAddrs(new HashMap<>()); + brokerData.getBrokerAddrs().put(0L, "127.0.0.1:10911"); + when(brokerData.selectBrokerAddr()).thenReturn("127.0.0.1:10911"); + brokerAddrTable.put("brokerA", brokerData); + clusterInfo.setBrokerAddrTable(brokerAddrTable); + + when(defaultMQAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo); + + List tasks = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + tasks.add(new AsyncTask(taskName, "taskId" + i, new CompletableFuture<>())); + } + when(defaultMQAdminExt.checkAsyncTaskStatus(anyString(), any(CheckAsyncTaskStatusRequestHeader.class))) + .thenReturn(tasks); + + checkAsyncTaskStatusSubCommand.execute(commandLine, new Options(), rpcHook); + + verify(defaultMQAdminExt).start(); + verify(defaultMQAdminExt).examineBrokerClusterInfo(); + verify(defaultMQAdminExt).checkAsyncTaskStatus(anyString(), argThat(header -> header.getMaxLimit() == maxLimit)); + verify(defaultMQAdminExt).shutdown(); + } + + @Test + public void testExecute_WithTaskStatus() throws Exception { + String taskName = "testTask"; + int taskStatus = 1; + + when(commandLine.hasOption('t')).thenReturn(true); + when(commandLine.getOptionValue('t')).thenReturn(taskName); + when(commandLine.hasOption('s')).thenReturn(true); + when(commandLine.getOptionValue('s')).thenReturn(String.valueOf(taskStatus)); + + ClusterInfo clusterInfo = new ClusterInfo(); + Map brokerAddrTable = new HashMap<>(); + BrokerData brokerData = spy(new BrokerData()); + brokerData.setBrokerAddrs(new HashMap<>()); + brokerData.getBrokerAddrs().put(0L, "127.0.0.1:10911"); + when(brokerData.selectBrokerAddr()).thenReturn("127.0.0.1:10911"); + brokerAddrTable.put("brokerA", brokerData); + clusterInfo.setBrokerAddrTable(brokerAddrTable); + + when(defaultMQAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo); + + AsyncTask asyncTask = new AsyncTask(taskName, "taskId", new CompletableFuture<>()); + asyncTask.setStatus(taskStatus); + when(defaultMQAdminExt.checkAsyncTaskStatus(anyString(), any(CheckAsyncTaskStatusRequestHeader.class))) + .thenReturn(Collections.singletonList(asyncTask)); + + checkAsyncTaskStatusSubCommand.execute(commandLine, new Options(), rpcHook); + + verify(defaultMQAdminExt).start(); + verify(defaultMQAdminExt).examineBrokerClusterInfo(); + verify(defaultMQAdminExt).checkAsyncTaskStatus(anyString(), argThat(header -> header.getTaskStatus() == taskStatus)); + verify(defaultMQAdminExt).shutdown(); + } + + @Test + public void testExecute_BrokerAddrNotFound() throws Exception { + String brokerAddr = "127.0.0.1:10911"; + String taskName = "testTask"; + + when(commandLine.hasOption('b')).thenReturn(true); + when(commandLine.getOptionValue('b')).thenReturn(brokerAddr); + when(commandLine.hasOption('t')).thenReturn(true); + when(commandLine.getOptionValue('t')).thenReturn(taskName); + + ClusterInfo clusterInfo = new ClusterInfo(); + Map brokerAddrTable = new HashMap<>(); + BrokerData brokerData = spy(new BrokerData()); + brokerData.setBrokerAddrs(new HashMap<>()); + brokerData.getBrokerAddrs().put(0L, "127.0.0.1:10900"); + when(brokerData.selectBrokerAddr()).thenReturn("127.0.0.1:10900"); + brokerAddrTable.put("brokerA", brokerData); + clusterInfo.setBrokerAddrTable(brokerAddrTable); + when(defaultMQAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + PrintStream originalOut = System.out; + System.setOut(new PrintStream(outputStream)); + + checkAsyncTaskStatusSubCommand.execute(commandLine, new Options(), rpcHook); + + System.setOut(originalOut); + + String output = outputStream.toString().trim(); + assertTrue(output.contains("Broker with address '" + brokerAddr + "' not found.")); + + verify(defaultMQAdminExt).start(); + verify(defaultMQAdminExt).examineBrokerClusterInfo(); + verify(defaultMQAdminExt, never()).checkAsyncTaskStatus(anyString(), any(CheckAsyncTaskStatusRequestHeader.class)); + verify(defaultMQAdminExt).shutdown(); + } + + @Test(expected = RuntimeException.class) + public void testExecute_ExceptionThrown() throws Exception { + String clusterName = "testCluster"; + String taskName = "testTask"; + + when(commandLine.hasOption('c')).thenReturn(true); + when(commandLine.getOptionValue('c')).thenReturn(clusterName); + when(commandLine.hasOption('t')).thenReturn(true); + when(commandLine.getOptionValue('t')).thenReturn(taskName); + + ClusterInfo clusterInfo = new ClusterInfo(); + Map> clusterAddrTable = new HashMap<>(); + clusterAddrTable.put(clusterName, new HashSet<>(Collections.singletonList("brokerA"))); + clusterInfo.setClusterAddrTable(clusterAddrTable); + + when(defaultMQAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo); + + when(defaultMQAdminExt.checkAsyncTaskStatus(anyString(), any(CheckAsyncTaskStatusRequestHeader.class))) + .thenThrow(new RuntimeException("Broker communication error")); + + try { + checkAsyncTaskStatusSubCommand.execute(commandLine, new Options(), rpcHook); + } catch (RuntimeException e) { + assertEquals( + "Failed to execute " + checkAsyncTaskStatusSubCommand.getClass().getSimpleName() + " command", + e.getMessage() + ); + throw e; + } + + verify(defaultMQAdminExt).start(); + verify(defaultMQAdminExt).examineBrokerClusterInfo(); + verify(defaultMQAdminExt).shutdown(); + } + + @Test + public void testExecuteWithCommandLineArgs() throws Exception { + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-t", "testTask"}; + CommandLine commandLine = ServerUtil.parseCmdLine( + "mqadmin " + checkAsyncTaskStatusSubCommand.commandName(), subargs, + checkAsyncTaskStatusSubCommand.buildCommandlineOptions(options), new DefaultParser() + ); + + ClusterInfo clusterInfo = new ClusterInfo(); + Map> clusterAddrTable = new HashMap<>(); + clusterAddrTable.put("defaultCluster", new HashSet<>(Collections.singletonList("brokerA"))); + clusterInfo.setClusterAddrTable(clusterAddrTable); + + BrokerData brokerData = spy(new BrokerData()); + brokerData.setBrokerAddrs(new HashMap<>()); + brokerData.getBrokerAddrs().put(0L, "127.0.0.1:10911"); + when(brokerData.selectBrokerAddr()).thenReturn("127.0.0.1:10911"); + Map brokerAddrTable = new HashMap<>(); + brokerAddrTable.put("brokerA", brokerData); + clusterInfo.setBrokerAddrTable(brokerAddrTable); + + when(defaultMQAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo); + + CompletableFuture future = new CompletableFuture<>(); + AsyncTask asyncTask = new AsyncTask("testTask", "122421", future); + asyncTask.setTaskId("taskId"); + asyncTask.setStatus(1); + asyncTask.setResult("Task completed successfully"); + asyncTask.setCreateTime(new Date()); + + when(defaultMQAdminExt.checkAsyncTaskStatus(eq("127.0.0.1:10911"), any(CheckAsyncTaskStatusRequestHeader.class))) + .thenReturn(Collections.singletonList(asyncTask)); + + checkAsyncTaskStatusSubCommand.execute(commandLine, options, rpcHook); + + verify(defaultMQAdminExt).start(); + verify(defaultMQAdminExt).examineBrokerClusterInfo(); + verify(defaultMQAdminExt).checkAsyncTaskStatus(eq("127.0.0.1:10911"), any(CheckAsyncTaskStatusRequestHeader.class)); + verify(defaultMQAdminExt).shutdown(); + } +} From 2637c95414127a190070b24e39c00fc9e18c081e Mon Sep 17 00:00:00 2001 From: KiteSoar Date: Fri, 7 Feb 2025 17:31:08 +0800 Subject: [PATCH 2/5] [ISSUE#9097]Optimize adminAsyncTaskManager. modified: broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java modified: broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java modified: common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java modified: remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java modified: tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java modified: tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java modified: tools/src/test/java/org/apache/rocketmq/tools/command/CheckAsyncTaskStatusSubCommandTest.java modified: broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java modified: broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java modified: common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java modified: remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java modified: tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java modified: tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java modified: tools/src/test/java/org/apache/rocketmq/tools/command/CheckAsyncTaskStatusSubCommandTest.java modified: broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java modified: broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java modified: common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java modified: remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java modified: tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java modified: tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java modified: tools/src/test/java/org/apache/rocketmq/tools/command/CheckAsyncTaskStatusSubCommandTest.java modified: broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java modified: broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java modified: common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java modified: remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java modified: tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java modified: tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java modified: tools/src/test/java/org/apache/rocketmq/tools/command/CheckAsyncTaskStatusSubCommandTest.java --- .../broker/AdminAsyncTaskManager.java | 43 +++++++++++++++++-- .../processor/AdminBrokerProcessor.java | 35 ++++++++++++--- .../common/CheckRocksdbCqWriteResult.java | 10 +++++ .../CheckAsyncTaskStatusRequestHeader.java | 10 +++++ .../CheckRocksdbCqWriteProgressCommand.java | 4 +- .../stats/CheckAsyncTaskStatusSubCommand.java | 18 +++++--- .../CheckAsyncTaskStatusSubCommandTest.java | 2 +- 7 files changed, 103 insertions(+), 19 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java b/broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java index 0d1726679bf..0d77c3c043c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.broker; import com.alibaba.fastjson.JSON; +import com.github.benmanes.caffeine.cache.RemovalCause; import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.common.AsyncTask; @@ -39,13 +40,47 @@ public class AdminAsyncTaskManager { // taskName -> taskId private final ConcurrentHashMap> taskNameToIdsMap; + private final int taskCacheExpireTimeMinutes; + + private final int maxTaskCacheSize; + public AdminAsyncTaskManager() { + this.taskCacheExpireTimeMinutes = initTaskCacheExpireTimeMinutes(); + this.maxTaskCacheSize = initMaxTaskCacheSize(); + this.taskNameToIdsMap = new ConcurrentHashMap<>(); this.asyncTaskCache = Caffeine.newBuilder() - .expireAfterWrite(30, TimeUnit.MINUTES) - .maximumSize(10000) + .expireAfterWrite(taskCacheExpireTimeMinutes, TimeUnit.MINUTES) + .maximumSize(maxTaskCacheSize) + .removalListener((String taskId, AsyncTask task, RemovalCause cause) -> { + if (task != null) { + taskNameToIdsMap.computeIfPresent(task.getTaskName(), (k, list) -> { + list.remove(taskId); + return list.isEmpty() ? null : list; + }); + } + }) .build(); + } - this.taskNameToIdsMap = new ConcurrentHashMap<>(); + /** + * Initialize the task cache expiration time in minutes. + * + * @return The task cache expiration time in minutes. + */ + private int initTaskCacheExpireTimeMinutes() { + return Integer.parseInt( + System.getProperty("rocketmq.broker.asyncTaskCacheExpireTime", "1440") + ); + } + + /** + * Initialize the maximum size of the task cache. + * + * @return The maximum size of the task cache. + */ + private int initMaxTaskCacheSize() { + return Integer.parseInt( + System.getProperty("rocketmq.broker.maxAsyncTaskCacheSize", "10000")); } /** @@ -60,7 +95,7 @@ public String createTask(String taskName, CompletableFuture future) { AsyncTask task = new AsyncTask(taskName, taskId, future); asyncTaskCache.put(taskId, task); - taskNameToIdsMap.computeIfAbsent(taskName, k -> new ArrayList<>()).add(taskId); + taskNameToIdsMap.computeIfAbsent(taskName, k -> Collections.synchronizedList(new ArrayList<>())).add(taskId); future.whenComplete((result, throwable) -> { if (throwable != null) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 40a39119ee4..11f1a36737f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -27,11 +27,13 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -494,18 +496,21 @@ private RemotingCommand updateAndGetGroupForbidden(ChannelHandlerContext ctx, Re } private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) { - CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult(); - result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_IN_PROGRESS.getValue()); - CompletableFuture future = CompletableFuture.supplyAsync(() -> { try { - return doCheckRocksdbCqWriteProgress(ctx, request); + CheckRocksdbCqWriteResult checkResult = doCheckRocksdbCqWriteProgress(ctx, request); + LOGGER.info("checkRocksdbCqWriteProgress result: {}", JSON.toJSONString(checkResult)); + return checkResult; } catch (Exception e) { throw new CompletionException(e); } }, asyncExecuteWorker); - asyncTaskManager.createTask("checkRocksdbCqWriteProgress", future); + String taskId = asyncTaskManager.createTask("checkRocksdbCqWriteProgress", future); + CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult(); + result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_IN_PROGRESS.getValue()); + result.setTaskId(taskId); + RemotingCommand response = RemotingCommand.createResponseCommand(null); response.setCode(ResponseCode.SUCCESS); response.setBody(JSON.toJSONBytes(result)); @@ -3610,7 +3615,22 @@ private RemotingCommand transferPopToFsStore(ChannelHandlerContext ctx, Remoting private RemotingCommand checkAsyncTaskStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final CheckAsyncTaskStatusRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckAsyncTaskStatusRequestHeader.class); - List taskIds = asyncTaskManager.getTaskIdsByName(requestHeader.getTaskName()); + String taskId = requestHeader.getTaskId(); + String taskName = requestHeader.getTaskName(); + + RemotingCommand response = RemotingCommand.createResponseCommand(CheckAsyncTaskStatusResponseHeader.class); + // If the taskId is not empty, query the async task with the specified taskId. + if (StringUtils.isNotBlank(taskId)) { + AsyncTask asyncTask = asyncTaskManager.getTaskStatus(requestHeader.getTaskId()); + if (asyncTask == null) { + throw new RemotingCommandException("taskId: " + requestHeader.getTaskId() + " not found"); + } + response.setCode(ResponseCode.SUCCESS); + response.setBody(JSON.toJSONBytes(asyncTask)); + return response; + } + + List taskIds = asyncTaskManager.getTaskIdsByName(taskName); if (CollectionUtils.isEmpty(taskIds)) { throw new RemotingCommandException("taskName: " + requestHeader.getTaskName() + " not found"); } @@ -3621,11 +3641,12 @@ private RemotingCommand checkAsyncTaskStatus(ChannelHandlerContext ctx, Remoting List asyncTasks = taskIds.stream() .map(asyncTaskManager::getTaskStatus) + .filter(Objects::nonNull) .filter(task -> filterStatus == null || task.getStatus() == filterStatus) + .sorted(Comparator.comparing(AsyncTask::getCreateTime).reversed()) .limit(maxResults) .collect(Collectors.toList()); - RemotingCommand response = RemotingCommand.createResponseCommand(CheckAsyncTaskStatusResponseHeader.class); response.setCode(ResponseCode.SUCCESS); response.setBody(JSON.toJSONBytes(asyncTasks)); return response; diff --git a/common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java b/common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java index fc67df86c2f..9a12ede68c4 100644 --- a/common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java +++ b/common/src/main/java/org/apache/rocketmq/common/CheckRocksdbCqWriteResult.java @@ -22,6 +22,8 @@ public class CheckRocksdbCqWriteResult { int checkStatus; + String taskId; + public enum CheckStatus { CHECK_OK(0), CHECK_NOT_OK(1), @@ -54,4 +56,12 @@ public int getCheckStatus() { public void setCheckStatus(int checkStatus) { this.checkStatus = checkStatus; } + + public String getTaskId() { + return taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java index fa6779f78a7..8239a90989b 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java @@ -30,6 +30,8 @@ public class CheckAsyncTaskStatusRequestHeader implements CommandCustomHeader { private String taskName; + private String taskId; + private int maxLimit; // Optional parameter for filtering return tasks nums. private Integer taskStatus; // Optional parameter for filtering tasks with specific statuses @@ -70,4 +72,12 @@ public Integer getTaskStatus() { public void setTaskStatus(Integer taskStatus) { this.taskStatus = taskStatus; } + + public String getTaskId() { + return taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java index a0fc9fce1fb..be53475ec7d 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java @@ -92,7 +92,9 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { if (result.getCheckStatus() == CheckRocksdbCqWriteResult.CheckStatus.CHECK_ERROR.getValue()) { System.out.print(brokerName + " check error, please check log... errInfo: " + result.getCheckResult()); } else { - System.out.print(brokerName + " check doing, please wait and get the result from log... \n"); + String taskId = result.getTaskId(); + System.out.print(brokerName + " check is in progress. please wait and get the result from log " + + "or you can query the progress later using the taskId: " + taskId); } } } catch (Exception e) { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java index 1800e2ab93a..e543bf766c6 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java @@ -65,6 +65,10 @@ public Options buildCommandlineOptions(Options options) { opt.setRequired(true); options.addOption(opt); + opt = new Option("i", "taskId", true, "The id of the asynchronous task"); + opt.setRequired(false); + options.addOption(opt); + opt = new Option("b", "brokerAddr", true, "Check which broker"); opt.setRequired(false); options.addOption(opt); @@ -93,8 +97,9 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { defaultMQAdminExt = createDefaultMQAdminExt(rpcHook); String taskName = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : null; - if (taskName == null) { - System.out.print("Task name cannot be empty. Please specify a task name with -t."); + String taskId = commandLine.hasOption('i') ? commandLine.getOptionValue('i').trim() : null; + if (taskName == null && taskId == null) { + System.out.print("Either task name or task ID must be provided."); return; } String brokerAddr = commandLine.hasOption('b') ? commandLine.getOptionValue('b').trim() : null; @@ -130,7 +135,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { .orElse(null); if (brokerData != null) { - checkAsyncTaskStatusOnBroker(brokerAddr, taskName, brokerData.getBrokerName(), maxLimit, taskStatus); + checkAsyncTaskStatusOnBroker(brokerAddr, taskName, taskId, brokerData.getBrokerName(), maxLimit, taskStatus); } else { System.out.printf("Broker with address '%s' not found.%n", brokerAddr); } @@ -144,7 +149,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { brokerNames.forEach(brokerName -> { BrokerData brokerData = brokerAddrTable.get(brokerName); if (brokerData != null) { - checkAsyncTaskStatusOnBroker(brokerData.selectBrokerAddr(), taskName, brokerName, maxLimit, taskStatus); + checkAsyncTaskStatusOnBroker(brokerData.selectBrokerAddr(), taskName, taskId, brokerName, maxLimit, taskStatus); } }); } else { @@ -152,7 +157,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { for (Map.Entry entry : brokerAddrTable.entrySet()) { String brokerName = entry.getKey(); String addr = entry.getValue().selectBrokerAddr(); - checkAsyncTaskStatusOnBroker(addr, taskName, brokerName, maxLimit, taskStatus); + checkAsyncTaskStatusOnBroker(addr, taskName, taskId, brokerName, maxLimit, taskStatus); } } } catch (Exception e) { @@ -162,10 +167,11 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { } } - private void checkAsyncTaskStatusOnBroker(String brokerAddr, String taskName, String brokerName, int maxLimit, Integer taskStatus) { + private void checkAsyncTaskStatusOnBroker(String brokerAddr, String taskName, String taskId, String brokerName, int maxLimit, Integer taskStatus) { try { CheckAsyncTaskStatusRequestHeader requestHeader = new CheckAsyncTaskStatusRequestHeader(); requestHeader.setTaskName(taskName); + requestHeader.setTaskId(taskId); requestHeader.setMaxLimit(maxLimit); requestHeader.setTaskStatus(taskStatus); diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/CheckAsyncTaskStatusSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/CheckAsyncTaskStatusSubCommandTest.java index 5a4c881e179..f41c7328bd6 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/CheckAsyncTaskStatusSubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/CheckAsyncTaskStatusSubCommandTest.java @@ -146,7 +146,7 @@ public void testExecute_EmptyTaskName() throws Exception { System.setOut(originalOut); String output = outputStream.toString().trim(); - assertEquals("Task name cannot be empty. Please specify a task name with -t.", output); + assertEquals("Either task name or task ID must be provided.", output); verify(defaultMQAdminExt, never()).start(); verify(defaultMQAdminExt, never()).examineBrokerClusterInfo(); From 97bcdeb177b76a619fad94b85f63b1f8ce9f6ae6 Mon Sep 17 00:00:00 2001 From: KiteSoar Date: Mon, 10 Feb 2025 16:40:54 +0800 Subject: [PATCH 3/5] [ISSUE#9097]Move params to broker config. --- .../broker/AdminAsyncTaskManager.java | 31 +++++-------------- .../processor/AdminBrokerProcessor.java | 2 +- .../apache/rocketmq/common/BrokerConfig.java | 20 ++++++++++++ 3 files changed, 28 insertions(+), 25 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java b/broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java index 0d77c3c043c..c4918cbfe60 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java @@ -30,6 +30,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.TaskStatus; public class AdminAsyncTaskManager { @@ -40,13 +41,16 @@ public class AdminAsyncTaskManager { // taskName -> taskId private final ConcurrentHashMap> taskNameToIdsMap; + private final BrokerConfig brokerConfig; + private final int taskCacheExpireTimeMinutes; private final int maxTaskCacheSize; - public AdminAsyncTaskManager() { - this.taskCacheExpireTimeMinutes = initTaskCacheExpireTimeMinutes(); - this.maxTaskCacheSize = initMaxTaskCacheSize(); + public AdminAsyncTaskManager(BrokerConfig brokerConfig) { + this.brokerConfig = brokerConfig; + this.taskCacheExpireTimeMinutes = brokerConfig.getTaskCacheExpireTimeMinutes(); + this.maxTaskCacheSize = brokerConfig.getMaxTaskCacheSize(); this.taskNameToIdsMap = new ConcurrentHashMap<>(); this.asyncTaskCache = Caffeine.newBuilder() .expireAfterWrite(taskCacheExpireTimeMinutes, TimeUnit.MINUTES) @@ -62,27 +66,6 @@ public AdminAsyncTaskManager() { .build(); } - /** - * Initialize the task cache expiration time in minutes. - * - * @return The task cache expiration time in minutes. - */ - private int initTaskCacheExpireTimeMinutes() { - return Integer.parseInt( - System.getProperty("rocketmq.broker.asyncTaskCacheExpireTime", "1440") - ); - } - - /** - * Initialize the maximum size of the task cache. - * - * @return The maximum size of the task cache. - */ - private int initMaxTaskCacheSize() { - return Integer.parseInt( - System.getProperty("rocketmq.broker.maxAsyncTaskCacheSize", "10000")); - } - /** * Creates a new asynchronous task with a unique taskId. * diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 11f1a36737f..8fb7cd96284 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -256,7 +256,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { public AdminBrokerProcessor(final BrokerController brokerController) { this.brokerController = brokerController; - this.asyncTaskManager = new AdminAsyncTaskManager(); + this.asyncTaskManager = new AdminAsyncTaskManager(brokerController.getBrokerConfig()); initConfigBlackList(); } diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index dd345449351..5e53511b8b7 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -455,6 +455,10 @@ public class BrokerConfig extends BrokerIdentity { private boolean recallMessageEnable = false; + private int taskCacheExpireTimeMinutes = Integer.parseInt(System.getProperty("rocketmq.broker.asyncTaskCacheExpireTime", "1440")); + + private int maxTaskCacheSize = Integer.parseInt(System.getProperty("rocketmq.broker.maxAsyncTaskCacheSize", "10000")); + public String getConfigBlackList() { return configBlackList; } @@ -2006,4 +2010,20 @@ public boolean isRecallMessageEnable() { public void setRecallMessageEnable(boolean recallMessageEnable) { this.recallMessageEnable = recallMessageEnable; } + + public int getMaxTaskCacheSize() { + return maxTaskCacheSize; + } + + public void setMaxTaskCacheSize(int maxTaskCacheSize) { + this.maxTaskCacheSize = maxTaskCacheSize; + } + + public int getTaskCacheExpireTimeMinutes() { + return taskCacheExpireTimeMinutes; + } + + public void setTaskCacheExpireTimeMinutes(int taskCacheExpireTimeMinutes) { + this.taskCacheExpireTimeMinutes = taskCacheExpireTimeMinutes; + } } From 690acb0c2bd87d6cbd3df13c2a9e1e496ad7b918 Mon Sep 17 00:00:00 2001 From: KiteSoar Date: Thu, 20 Feb 2025 20:52:29 +0800 Subject: [PATCH 4/5] [ISSUE#9097] Add async task manager switch. --- .../processor/AdminBrokerProcessor.java | 33 ++++++++++++++++--- .../apache/rocketmq/common/BrokerConfig.java | 10 ++++++ 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 8fb7cd96284..e0019a6193d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -256,7 +256,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { public AdminBrokerProcessor(final BrokerController brokerController) { this.brokerController = brokerController; - this.asyncTaskManager = new AdminAsyncTaskManager(brokerController.getBrokerConfig()); + this.asyncTaskManager = initAsyncTaskManager(brokerController.getBrokerConfig()); initConfigBlackList(); } @@ -268,6 +268,13 @@ private void initConfigBlackList() { configBlackList.addAll(Arrays.asList(configArray)); } + private AdminAsyncTaskManager initAsyncTaskManager(BrokerConfig brokerConfig) { + if (brokerConfig.isEnableAsyncTaskCheck()) { + return new AdminAsyncTaskManager(brokerConfig); + } + return null; + } + @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { @@ -496,20 +503,24 @@ private RemotingCommand updateAndGetGroupForbidden(ChannelHandlerContext ctx, Re } private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) { + CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult(); + result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_IN_PROGRESS.getValue()); + CompletableFuture future = CompletableFuture.supplyAsync(() -> { try { CheckRocksdbCqWriteResult checkResult = doCheckRocksdbCqWriteProgress(ctx, request); LOGGER.info("checkRocksdbCqWriteProgress result: {}", JSON.toJSONString(checkResult)); return checkResult; } catch (Exception e) { + LOGGER.error("checkRocksdbCqWriteProgress error", e); throw new CompletionException(e); } }, asyncExecuteWorker); - String taskId = asyncTaskManager.createTask("checkRocksdbCqWriteProgress", future); - CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult(); - result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_IN_PROGRESS.getValue()); - result.setTaskId(taskId); + if (brokerController.getBrokerConfig().isEnableAsyncTaskCheck()) { + String taskId = registerAsyncTask("checkRocksdbCqWriteProgress", future); + result.setTaskId(taskId); + } RemotingCommand response = RemotingCommand.createResponseCommand(null); response.setCode(ResponseCode.SUCCESS); @@ -517,6 +528,14 @@ private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, R return response; } + private String registerAsyncTask(String taskName, CompletableFuture future) { + if (asyncTaskManager == null) { + LOGGER.warn("asyncTaskManager not initialized, task registration skipped (enableAsyncTaskCheck config disabled). taskName={}", taskName); + return null; + } + return asyncTaskManager.createTask(taskName, future); + } + private RemotingCommand exportRocksDBConfigToJson(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { ExportRocksDBConfigToJsonRequestHeader requestHeader = request.decodeCommandCustomHeader(ExportRocksDBConfigToJsonRequestHeader.class); @@ -3614,6 +3633,10 @@ private RemotingCommand transferPopToFsStore(ChannelHandlerContext ctx, Remoting } private RemotingCommand checkAsyncTaskStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + if (!brokerController.getBrokerConfig().isEnableAsyncTaskCheck()) { + throw new RemotingCommandException("async task check is not enabled"); + } + final CheckAsyncTaskStatusRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckAsyncTaskStatusRequestHeader.class); String taskId = requestHeader.getTaskId(); String taskName = requestHeader.getTaskName(); diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 5e53511b8b7..12939135cc0 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -459,6 +459,8 @@ public class BrokerConfig extends BrokerIdentity { private int maxTaskCacheSize = Integer.parseInt(System.getProperty("rocketmq.broker.maxAsyncTaskCacheSize", "10000")); + private boolean enableAsyncTaskCheck = Boolean.parseBoolean(System.getProperty("rocketmq.broker.enableAsyncTaskCheck", "false")); + public String getConfigBlackList() { return configBlackList; } @@ -2026,4 +2028,12 @@ public int getTaskCacheExpireTimeMinutes() { public void setTaskCacheExpireTimeMinutes(int taskCacheExpireTimeMinutes) { this.taskCacheExpireTimeMinutes = taskCacheExpireTimeMinutes; } + + public boolean isEnableAsyncTaskCheck() { + return enableAsyncTaskCheck; + } + + public void setEnableAsyncTaskCheck(boolean enableAsyncTaskCheck) { + this.enableAsyncTaskCheck = enableAsyncTaskCheck; + } } From ce88393f5bf8a3bf80da4c7fb4f8a96cb9797609 Mon Sep 17 00:00:00 2001 From: KiteSoar Date: Sun, 23 Feb 2025 11:46:55 +0800 Subject: [PATCH 5/5] [ISSUE#9097] Add UT. --- .../processor/AdminBrokerProcessorTest.java | 44 ++++++++++++++++++- .../CheckAsyncTaskStatusRequestHeader.java | 8 ++-- .../stats/CheckAsyncTaskStatusSubCommand.java | 15 ++++++- 3 files changed, 60 insertions(+), 7 deletions(-) diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index 959b147d9d3..2377cc2defd 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -41,6 +41,7 @@ import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; @@ -73,6 +74,8 @@ import org.apache.rocketmq.remoting.protocol.body.QueryCorrectionOffsetBody; import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.body.UserInfo; +import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateAclRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateUserRequestHeader; @@ -230,7 +233,8 @@ public void init() throws Exception { field.set(brokerController, broker2Client); //doReturn(sendMessageProcessor).when(brokerController).getSendMessageProcessor(); - + BrokerConfig config = brokerController.getBrokerConfig(); + config.setEnableAsyncTaskCheck(true); adminBrokerProcessor = new AdminBrokerProcessor(brokerController); systemTopicSet = Sets.newHashSet( @@ -1328,6 +1332,44 @@ public void testResetMasterFlushOffset() throws RemotingCommandException { assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } + @Test + public void testCheckAsyncTaskStatusByTaskId() throws RemotingCommandException { + CheckRocksdbCqWriteProgressRequestHeader requestHeader = new CheckRocksdbCqWriteProgressRequestHeader(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS, requestHeader); + + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + CheckRocksdbCqWriteResult results = RemotingSerializable.decode(response.getBody(), CheckRocksdbCqWriteResult.class); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + CheckAsyncTaskStatusRequestHeader requestHeader1 = new CheckAsyncTaskStatusRequestHeader(); + requestHeader1.setTaskId(results.getTaskId()); + RemotingCommand request1 = RemotingCommand.createRequestCommand(RequestCode.CHECK_ASYNC_TASK_STATUS, requestHeader1); + HashMap extFields = new HashMap<>(); + extFields.put("taskId",results.getTaskId()); + request1.setExtFields(extFields); + RemotingCommand response1 = adminBrokerProcessor.processRequest(handlerContext, request1); + assertThat(response1.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + @Test + public void testCheckAsyncTaskStatusByTaskName() throws RemotingCommandException { + CheckRocksdbCqWriteProgressRequestHeader requestHeader = new CheckRocksdbCqWriteProgressRequestHeader(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS, requestHeader); + + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + String taskName = "checkRocksdbCqWriteProgress"; + CheckAsyncTaskStatusRequestHeader requestHeader1 = new CheckAsyncTaskStatusRequestHeader(); + requestHeader1.setTaskName(taskName); + RemotingCommand request1 = RemotingCommand.createRequestCommand(RequestCode.CHECK_ASYNC_TASK_STATUS, requestHeader1); + HashMap extFields = new HashMap<>(); + extFields.put("taskName",taskName); + request1.setExtFields(extFields); + RemotingCommand response1 = adminBrokerProcessor.processRequest(handlerContext, request1); + assertThat(response1.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + private ResetOffsetRequestHeader createRequestHeader(String topic,String group,long timestamp,boolean force,long offset,int queueId) { ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader(); requestHeader.setTopic(topic); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java index 8239a90989b..5cf1be44bc1 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java @@ -38,11 +38,11 @@ public class CheckAsyncTaskStatusRequestHeader implements CommandCustomHeader { @Override public void checkFields() throws RemotingCommandException { - if (StringUtils.isBlank(taskName)) { - throw new RemotingCommandException("taskName cannot be null or blank"); + if (StringUtils.isBlank(taskName) && StringUtils.isBlank(taskId)) { + throw new RemotingCommandException("taskName and taskId cannot be empty at the same time"); } - if (maxLimit <= 0) { - throw new RemotingCommandException("maxLimit must be greater than 0"); + if (maxLimit < 0) { + throw new RemotingCommandException("maxLimit cannot be less than 0."); } if (taskStatus != null && (taskStatus < 0 || taskStatus > 3)) { throw new RemotingCommandException("taskStatus must be between 0 and 3"); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java index e543bf766c6..efeeddc01d2 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java @@ -105,7 +105,17 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { String brokerAddr = commandLine.hasOption('b') ? commandLine.getOptionValue('b').trim() : null; String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : null; String namesAddr = commandLine.hasOption('n') ? commandLine.getOptionValue('n').trim() : null; - int maxLimit = commandLine.hasOption('m') ? Integer.parseInt(commandLine.getOptionValue('m').trim()) : DEFAULT_MAX_TASKS; + String maxLimitStr = commandLine.hasOption('m') ? commandLine.getOptionValue('m').trim() : null; + int maxLimit = DEFAULT_MAX_TASKS; + if (maxLimitStr != null && !maxLimitStr.isEmpty()) { + try { + maxLimit = Integer.parseInt(maxLimitStr); + } catch (NumberFormatException e) { + System.out.print("Illegal maxLimit parameter value"); + return; + } + } + Integer taskStatus = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s').trim()) : null; try { @@ -146,10 +156,11 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { System.out.printf("Cluster '%s' not found or has no brokers.%n", clusterName); return; } + int finalMaxLimit = maxLimit; brokerNames.forEach(brokerName -> { BrokerData brokerData = brokerAddrTable.get(brokerName); if (brokerData != null) { - checkAsyncTaskStatusOnBroker(brokerData.selectBrokerAddr(), taskName, taskId, brokerName, maxLimit, taskStatus); + checkAsyncTaskStatusOnBroker(brokerData.selectBrokerAddr(), taskName, taskId, brokerName, finalMaxLimit, taskStatus); } }); } else {