Skip to content

Commit 988c826

Browse files
authored
[ISSUE #9196] Broker return pop stats when receive notification (#9197)
1 parent 53fdc4a commit 988c826

File tree

4 files changed

+71
-3
lines changed

4 files changed

+71
-3
lines changed

broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,11 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx,
161161
}
162162

163163
if (!hasMsg) {
164-
if (popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader)) == PollingResult.POLLING_SUC) {
164+
PollingResult pollingResult = popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader));
165+
if (pollingResult == PollingResult.POLLING_SUC) {
165166
return null;
167+
} else if (pollingResult == PollingResult.POLLING_FULL) {
168+
responseHeader.setPollingFull(true);
166169
}
167170
}
168171
response.setCode(ResponseCode.SUCCESS);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.client.consumer;
18+
19+
public class NotifyResult {
20+
private boolean hasMsg;
21+
private boolean pollingFull;
22+
23+
public boolean isHasMsg() {
24+
return hasMsg;
25+
}
26+
27+
public boolean isPollingFull() {
28+
return pollingFull;
29+
}
30+
31+
public void setHasMsg(boolean hasMsg) {
32+
this.hasMsg = hasMsg;
33+
}
34+
35+
public void setPollingFull(boolean pollingFull) {
36+
this.pollingFull = pollingFull;
37+
}
38+
39+
@Override public String toString() {
40+
return "NotifyResult{" +
41+
"hasMsg=" + hasMsg +
42+
", pollingFull=" + pollingFull +
43+
'}';
44+
}
45+
}

client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.rocketmq.client.ClientConfig;
2525
import org.apache.rocketmq.client.consumer.AckCallback;
2626
import org.apache.rocketmq.client.consumer.AckResult;
27+
import org.apache.rocketmq.client.consumer.NotifyResult;
2728
import org.apache.rocketmq.client.consumer.PopCallback;
2829
import org.apache.rocketmq.client.consumer.PopResult;
2930
import org.apache.rocketmq.client.consumer.PullCallback;
@@ -620,14 +621,23 @@ public CompletableFuture<Void> unlockBatchMQOneway(String brokerAddr,
620621
}
621622

622623
public CompletableFuture<Boolean> notification(String brokerAddr, NotificationRequestHeader requestHeader,
624+
long timeoutMillis) {
625+
return notificationWithPollingStats(brokerAddr, requestHeader, timeoutMillis).thenApply(NotifyResult::isHasMsg);
626+
}
627+
628+
public CompletableFuture<NotifyResult> notificationWithPollingStats(String brokerAddr,
629+
NotificationRequestHeader requestHeader,
623630
long timeoutMillis) {
624631
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFICATION, requestHeader);
625632
return this.getRemotingClient().invoke(brokerAddr, request, timeoutMillis).thenCompose(response -> {
626-
CompletableFuture<Boolean> future0 = new CompletableFuture<>();
633+
CompletableFuture<NotifyResult> future0 = new CompletableFuture<>();
627634
if (response.getCode() == ResponseCode.SUCCESS) {
628635
try {
629636
NotificationResponseHeader responseHeader = (NotificationResponseHeader) response.decodeCommandCustomHeader(NotificationResponseHeader.class);
630-
future0.complete(responseHeader.isHasMsg());
637+
NotifyResult notifyResult = new NotifyResult();
638+
notifyResult.setHasMsg(responseHeader.isHasMsg());
639+
notifyResult.setPollingFull(responseHeader.isPollingFull());
640+
future0.complete(notifyResult);
631641
} catch (Throwable t) {
632642
future0.completeExceptionally(t);
633643
}

remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationResponseHeader.java

+10
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,20 @@ public class NotificationResponseHeader implements CommandCustomHeader {
2626
@CFNotNull
2727
private boolean hasMsg = false;
2828

29+
private boolean pollingFull = false;
30+
2931
public boolean isHasMsg() {
3032
return hasMsg;
3133
}
3234

35+
public boolean isPollingFull() {
36+
return pollingFull;
37+
}
38+
39+
public void setPollingFull(boolean pollingFull) {
40+
this.pollingFull = pollingFull;
41+
}
42+
3343
public void setHasMsg(boolean hasMsg) {
3444
this.hasMsg = hasMsg;
3545
}

0 commit comments

Comments
 (0)