64
64
import org .apache .rocketmq .common .message .MessageExt ;
65
65
import org .apache .rocketmq .common .message .MessageExtBrokerInner ;
66
66
import org .apache .rocketmq .common .topic .TopicValidator ;
67
+ import org .apache .rocketmq .common .utils .ConcurrentHashMapUtils ;
67
68
import org .apache .rocketmq .logging .org .slf4j .Logger ;
68
69
import org .apache .rocketmq .logging .org .slf4j .LoggerFactory ;
69
70
import org .apache .rocketmq .remoting .CommandCallback ;
@@ -150,11 +151,11 @@ public static String genAckUniqueId(AckMsg ackMsg) {
150
151
151
152
public static String genBatchAckUniqueId (BatchAckMsg batchAckMsg ) {
152
153
return batchAckMsg .getTopic ()
153
- + PopAckConstants .SPLIT + batchAckMsg .getQueueId ()
154
- + PopAckConstants .SPLIT + batchAckMsg .getAckOffsetList ().toString ()
155
- + PopAckConstants .SPLIT + batchAckMsg .getConsumerGroup ()
156
- + PopAckConstants .SPLIT + batchAckMsg .getPopTime ()
157
- + PopAckConstants .SPLIT + PopAckConstants .BATCH_ACK_TAG ;
154
+ + PopAckConstants .SPLIT + batchAckMsg .getQueueId ()
155
+ + PopAckConstants .SPLIT + batchAckMsg .getAckOffsetList ().toString ()
156
+ + PopAckConstants .SPLIT + batchAckMsg .getConsumerGroup ()
157
+ + PopAckConstants .SPLIT + batchAckMsg .getPopTime ()
158
+ + PopAckConstants .SPLIT + PopAckConstants .BATCH_ACK_TAG ;
158
159
}
159
160
160
161
public static String genCkUniqueId (PopCheckPoint ck ) {
@@ -861,7 +862,7 @@ private CompletableFuture<Long> popMsgFromQueue(String topic, String attemptId,
861
862
862
863
private boolean isPopShouldStop (String topic , String group , int queueId ) {
863
864
return brokerController .getBrokerConfig ().isEnablePopMessageThreshold () &&
864
- brokerController .getPopInflightMessageCounter ().getGroupPopInFlightMessageNum (topic , group , queueId ) > brokerController .getBrokerConfig ().getPopInflightMessageThreshold ();
865
+ brokerController .getPopInflightMessageCounter ().getGroupPopInFlightMessageNum (topic , group , queueId ) > brokerController .getBrokerConfig ().getPopInflightMessageThreshold ();
865
866
}
866
867
867
868
private long getPopOffset (String topic , String group , int queueId , int initMode , boolean init , String lockKey ,
@@ -908,7 +909,7 @@ private long getInitOffset(String topic, String group, int queueId, int initMode
908
909
}
909
910
if (init ) { // whichever initMode
910
911
this .brokerController .getConsumerOffsetManager ().commitOffset (
911
- "getPopOffset" , group , topic , queueId , offset );
912
+ "getPopOffset" , group , topic , queueId , offset );
912
913
}
913
914
return offset ;
914
915
}
@@ -1002,12 +1003,13 @@ static class TimedLock {
1002
1003
private volatile long lockTime ;
1003
1004
1004
1005
public TimedLock () {
1005
- this .lock = new AtomicBoolean (true );
1006
+ // init lock status, false means not locked
1007
+ this .lock = new AtomicBoolean (false );
1006
1008
this .lockTime = System .currentTimeMillis ();
1007
1009
}
1008
1010
1009
1011
public boolean tryLock () {
1010
- boolean ret = lock .compareAndSet (true , false );
1012
+ boolean ret = lock .compareAndSet (false , true );
1011
1013
if (ret ) {
1012
1014
this .lockTime = System .currentTimeMillis ();
1013
1015
return true ;
@@ -1017,11 +1019,11 @@ public boolean tryLock() {
1017
1019
}
1018
1020
1019
1021
public void unLock () {
1020
- lock .set (true );
1022
+ lock .set (false );
1021
1023
}
1022
1024
1023
1025
public boolean isLock () {
1024
- return ! lock .get ();
1026
+ return lock .get ();
1025
1027
}
1026
1028
1027
1029
public long getLockTime () {
@@ -1041,21 +1043,7 @@ public boolean tryLock(String topic, String consumerGroup, int queueId) {
1041
1043
}
1042
1044
1043
1045
public boolean tryLock (String key ) {
1044
- TimedLock timedLock = expiredLocalCache .get (key );
1045
-
1046
- if (timedLock == null ) {
1047
- TimedLock old = expiredLocalCache .putIfAbsent (key , new TimedLock ());
1048
- if (old != null ) {
1049
- return false ;
1050
- } else {
1051
- timedLock = expiredLocalCache .get (key );
1052
- }
1053
- }
1054
-
1055
- if (timedLock == null ) {
1056
- return false ;
1057
- }
1058
-
1046
+ TimedLock timedLock = ConcurrentHashMapUtils .computeIfAbsent (expiredLocalCache , key , k -> new TimedLock ());
1059
1047
return timedLock .tryLock ();
1060
1048
}
1061
1049
0 commit comments