Skip to content

Commit 343ed4f

Browse files
3424672656wanghuaiyuan
and
wanghuaiyuan
authored
[ISSUE #8127] Optimize the metric calculation logic of the time wheel (#8128)
* Fix the metric of the time wheel was incorrectly calculated * Fix the metric of the time wheel was incorrectly calculated --------- Co-authored-by: wanghuaiyuan <wanghuaiyuan@xiaomi.com>
1 parent 33a185a commit 343ed4f

File tree

1 file changed

+19
-4
lines changed

1 file changed

+19
-4
lines changed

store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java

+19-4
Original file line numberDiff line numberDiff line change
@@ -1571,6 +1571,9 @@ public String getServiceName() {
15711571
public void run() {
15721572
setState(AbstractStateService.START);
15731573
TimerMessageStore.LOGGER.info(this.getServiceName() + " service start");
1574+
//Mark different rounds
1575+
boolean isRound = true;
1576+
Map<String ,MessageExt> avoidDeleteLose = new HashMap<>();
15741577
while (!this.isStopped()) {
15751578
try {
15761579
setState(AbstractStateService.WAITING);
@@ -1587,9 +1590,18 @@ public void run() {
15871590
MessageExt msgExt = getMessageByCommitOffset(tr.getOffsetPy(), tr.getSizePy());
15881591
if (null != msgExt) {
15891592
if (needDelete(tr.getMagic()) && !needRoll(tr.getMagic())) {
1593+
//Clearing is performed once in each round.
1594+
//The deletion message is received first and the common message is received once
1595+
if (!isRound) {
1596+
isRound = true;
1597+
for (MessageExt messageExt: avoidDeleteLose.values()) {
1598+
addMetric(messageExt, 1);
1599+
}
1600+
avoidDeleteLose.clear();
1601+
}
15901602
if (msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY) != null && tr.getDeleteList() != null) {
1591-
//Execute metric plus one for messages that fail to be deleted
1592-
addMetric(msgExt, 1);
1603+
1604+
avoidDeleteLose.put(msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY), msgExt);
15931605
tr.getDeleteList().add(msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY));
15941606
}
15951607
tr.idempotentRelease();
@@ -1599,10 +1611,13 @@ public void run() {
15991611
if (null == uniqueKey) {
16001612
LOGGER.warn("No uniqueKey for msg:{}", msgExt);
16011613
}
1614+
//Mark ready for next round
1615+
if (isRound) {
1616+
isRound = false;
1617+
}
16021618
if (null != uniqueKey && tr.getDeleteList() != null && tr.getDeleteList().size() > 0
16031619
&& tr.getDeleteList().contains(buildDeleteKey(getRealTopic(msgExt), uniqueKey))) {
1604-
//Normally, it cancels out with the +1 above
1605-
addMetric(msgExt, -1);
1620+
avoidDeleteLose.remove(uniqueKey);
16061621
doRes = true;
16071622
tr.idempotentRelease();
16081623
perfCounterTicks.getCounter("dequeue_delete").flow(1);

0 commit comments

Comments
 (0)