Skip to content

Commit 77570d5

Browse files
lhotarimerlimatequanz
authored
[improve][broker] Add limits for Key_Shared Subscription look ahead in dispatching (#23231)
Co-authored-by: Matteo Merli <mmerli@apache.org> Co-authored-by: Yuri Mizushima <equanz324@gmail.com>
1 parent bf53164 commit 77570d5

16 files changed

+1068
-384
lines changed

conf/broker.conf

+19
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,25 @@ maxUnackedMessagesPerBroker=0
355355
# limit/2 messages
356356
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16
357357

358+
# For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer
359+
# or a blocked key hash (because of ordering constraints), the broker will continue reading more
360+
# messages from the backlog and attempt to dispatch them to consumers until the number of replay
361+
# messages reaches the calculated threshold.
362+
# Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer *
363+
# connected consumer count, keySharedLookAheadMsgInReplayThresholdPerSubscription).
364+
# Setting this value to 0 will disable the limit calculated per consumer.
365+
keySharedLookAheadMsgInReplayThresholdPerConsumer=2000
366+
367+
# For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer
368+
# or a blocked key hash (because of ordering constraints), the broker will continue reading more
369+
# messages from the backlog and attempt to dispatch them to consumers until the number of replay
370+
# messages reaches the calculated threshold.
371+
# Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer *
372+
# connected consumer count, keySharedLookAheadMsgInReplayThresholdPerSubscription).
373+
# This value should be set to a value less than 2 * managedLedgerMaxUnackedRangesToPersist.
374+
# Setting this value to 0 will disable the limit calculated per subscription.
375+
keySharedLookAheadMsgInReplayThresholdPerSubscription=20000
376+
358377
# Broker periodically checks if subscription is stuck and unblock if flag is enabled. (Default is disabled)
359378
unblockStuckSubscriptionEnabled=false
360379

conf/standalone.conf

+19
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,25 @@ maxUnackedMessagesPerBroker=0
232232
# limit/2 messages
233233
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16
234234

235+
# For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer
236+
# or a blocked key hash (because of ordering constraints), the broker will continue reading more
237+
# messages from the backlog and attempt to dispatch them to consumers until the number of replay
238+
# messages reaches the calculated threshold.
239+
# Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer *
240+
# connected consumer count, keySharedLookAheadMsgInReplayThresholdPerSubscription).
241+
# Setting this value to 0 will disable the limit calculated per consumer.
242+
keySharedLookAheadMsgInReplayThresholdPerConsumer=2000
243+
244+
# For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer
245+
# or a blocked key hash (because of ordering constraints), the broker will continue reading more
246+
# messages from the backlog and attempt to dispatch them to consumers until the number of replay
247+
# messages reaches the calculated threshold.
248+
# Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer *
249+
# connected consumer count, keySharedLookAheadMsgInReplayThresholdPerSubscription).
250+
# This value should be set to a value less than 2 * managedLedgerMaxUnackedRangesToPersist.
251+
# Setting this value to 0 will disable the limit calculated per subscription.
252+
keySharedLookAheadMsgInReplayThresholdPerSubscription=20000
253+
235254
# Tick time to schedule task that checks topic publish rate limiting across all topics
236255
# Reducing to lower value can give more accuracy while throttling publish but
237256
# it uses more CPU to perform frequent check. (Disable publish throttling with value 0)

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

+30
Original file line numberDiff line numberDiff line change
@@ -949,6 +949,36 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
949949
+ " back and unack count reaches to `limit/2`. Using a value of 0, is disabling unackedMessage-limit"
950950
+ " check and broker doesn't block dispatchers")
951951
private int maxUnackedMessagesPerBroker = 0;
952+
953+
@FieldContext(
954+
category = CATEGORY_POLICIES,
955+
doc = "For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer"
956+
+ " or a blocked key hash (because of ordering constraints), the broker will continue reading more"
957+
+ " messages from the backlog and attempt to dispatch them to consumers until the number of replay"
958+
+ " messages reaches the calculated threshold.\n"
959+
+ "Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer *"
960+
+ " connected consumer count, keySharedLookAheadMsgInReplayThresholdPerSubscription)"
961+
+ ".\n"
962+
+ "Setting this value to 0 will disable the limit calculated per consumer.",
963+
dynamic = true
964+
)
965+
private int keySharedLookAheadMsgInReplayThresholdPerConsumer = 2000;
966+
967+
@FieldContext(
968+
category = CATEGORY_POLICIES,
969+
doc = "For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer"
970+
+ " or a blocked key hash (because of ordering constraints), the broker will continue reading more"
971+
+ " messages from the backlog and attempt to dispatch them to consumers until the number of replay"
972+
+ " messages reaches the calculated threshold.\n"
973+
+ "Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer *"
974+
+ " connected consumer count, keySharedLookAheadMsgInReplayThresholdPerSubscription)"
975+
+ ".\n"
976+
+ "This value should be set to a value less than 2 * managedLedgerMaxUnackedRangesToPersist.\n"
977+
+ "Setting this value to 0 will disable the limit calculated per subscription.\n",
978+
dynamic = true
979+
)
980+
private int keySharedLookAheadMsgInReplayThresholdPerSubscription = 20000;
981+
952982
@FieldContext(
953983
category = CATEGORY_POLICIES,
954984
doc = "Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher "

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java

+29-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@
2222
import java.util.ArrayList;
2323
import java.util.List;
2424
import java.util.NavigableSet;
25+
import java.util.Optional;
2526
import java.util.Set;
27+
import java.util.TreeSet;
28+
import java.util.function.Predicate;
2629
import javax.annotation.concurrent.NotThreadSafe;
2730
import org.apache.bookkeeper.mledger.Position;
2831
import org.apache.bookkeeper.mledger.PositionFactory;
@@ -146,8 +149,32 @@ public boolean containsStickyKeyHashes(Set<Integer> stickyKeyHashes) {
146149
return false;
147150
}
148151

149-
public NavigableSet<Position> getMessagesToReplayNow(int maxMessagesToRead) {
150-
return messagesToRedeliver.items(maxMessagesToRead, PositionFactory::create);
152+
public boolean containsStickyKeyHash(int stickyKeyHash) {
153+
return !allowOutOfOrderDelivery && hashesRefCount.containsKey(stickyKeyHash);
154+
}
155+
156+
public Optional<Position> getFirstPositionInReplay() {
157+
return messagesToRedeliver.first(PositionFactory::create);
158+
}
159+
160+
/**
161+
* Get the messages to replay now.
162+
*
163+
* @param maxMessagesToRead
164+
* the max messages to read
165+
* @param filter
166+
* the filter to use to select the messages to replay
167+
* @return the messages to replay now
168+
*/
169+
public NavigableSet<Position> getMessagesToReplayNow(int maxMessagesToRead, Predicate<Position> filter) {
170+
NavigableSet<Position> items = new TreeSet<>();
171+
messagesToRedeliver.processItems(PositionFactory::create, item -> {
172+
if (filter.test(item)) {
173+
items.add(item);
174+
}
175+
return items.size() < maxMessagesToRead;
176+
});
177+
return items;
151178
}
152179

153180
/**

0 commit comments

Comments
 (0)