Skip to content

Commit 0b687a9

Browse files
authored
[ISSUE #8997] Ensure there is an opportunity to send a retry message when broker no response (#9137)
1 parent 47fe6b2 commit 0b687a9

File tree

2 files changed

+23
-2
lines changed

2 files changed

+23
-2
lines changed

client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -777,8 +777,16 @@ private SendResult sendDefaultImpl(
777777
callTimeout = true;
778778
break;
779779
}
780-
781-
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
780+
long curTimeout = timeout - costTime;
781+
// Get the maximum timeout allowed per request
782+
long maxSendTimeoutPerRequest = defaultMQProducer.getSendMsgMaxTimeoutPerRequest();
783+
// Determine if retries are still possible
784+
boolean canRetryAgain = times + 1 < timesTotal;
785+
// If retries are possible, and the current timeout exceeds the max allowed timeout, set the current timeout to the max allowed
786+
if (maxSendTimeoutPerRequest > -1 && canRetryAgain && curTimeout > maxSendTimeoutPerRequest) {
787+
curTimeout = maxSendTimeoutPerRequest;
788+
}
789+
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, curTimeout);
782790
endTimestamp = System.currentTimeMillis();
783791
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true);
784792
switch (communicationMode) {

client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java

+13
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
115115
*/
116116
private int sendMsgTimeout = 3000;
117117

118+
/**
119+
* Max timeout for sending messages per request.
120+
*/
121+
private int sendMsgMaxTimeoutPerRequest = -1;
122+
118123
/**
119124
* Compress message body threshold, namely, message body larger than 4k will be compressed on default.
120125
*/
@@ -1259,6 +1264,14 @@ public void setSendMsgTimeout(int sendMsgTimeout) {
12591264
this.sendMsgTimeout = sendMsgTimeout;
12601265
}
12611266

1267+
public int getSendMsgMaxTimeoutPerRequest() {
1268+
return sendMsgMaxTimeoutPerRequest;
1269+
}
1270+
1271+
public void setSendMsgMaxTimeoutPerRequest(int sendMsgMaxTimeoutPerRequest) {
1272+
this.sendMsgMaxTimeoutPerRequest = sendMsgMaxTimeoutPerRequest;
1273+
}
1274+
12621275
public int getCompressMsgBodyOverHowmuch() {
12631276
return compressMsgBodyOverHowmuch;
12641277
}

0 commit comments

Comments
 (0)