@@ -348,7 +348,7 @@ public Pair<Long, Long> getConsumerLagStats(String group, String topic, int queu
348
348
brokerOffset = 0 ;
349
349
}
350
350
351
- if (isPop ) {
351
+ if (isPop && ! brokerConfig . isPopConsumerKVServiceEnable () ) {
352
352
long pullOffset = popBufferMergeService .getLatestOffset (topic , group , queueId );
353
353
if (pullOffset < 0 ) {
354
354
pullOffset = offsetManager .queryOffset (group , topic , queueId );
@@ -401,7 +401,7 @@ public Pair<Long, Long> getInFlightMsgStats(String group, String topic, boolean
401
401
402
402
public Pair <Long , Long > getInFlightMsgStats (String group , String topic , int queueId , boolean isPop )
403
403
throws ConsumeQueueException {
404
- if (isPop ) {
404
+ if (isPop && ! brokerConfig . isPopConsumerKVServiceEnable () ) {
405
405
long inflight = popInflightMessageCounter .getGroupPopInFlightMessageNum (topic , group , queueId );
406
406
long pullOffset = popBufferMergeService .getLatestOffset (topic , group , queueId );
407
407
if (pullOffset < 0 ) {
@@ -456,14 +456,11 @@ public long getAvailableMsgCount(String group, String topic, int queueId, boolea
456
456
}
457
457
458
458
long pullOffset ;
459
- if (isPop ) {
459
+ if (isPop && ! brokerConfig . isPopConsumerKVServiceEnable () ) {
460
460
pullOffset = popBufferMergeService .getLatestOffset (topic , group , queueId );
461
461
if (pullOffset < 0 ) {
462
462
pullOffset = offsetManager .queryOffset (group , topic , queueId );
463
463
}
464
- if (pullOffset < 0 ) {
465
- pullOffset = brokerOffset ;
466
- }
467
464
} else {
468
465
pullOffset = offsetManager .queryPullOffset (group , topic , queueId );
469
466
}
0 commit comments