Skip to content

Commit 6d8b15d

Browse files
[fix][broker][branch-3.0] Fail fast if the extensible load manager failed to start (apache#23297) (apache#23302)
Co-authored-by: Yunze Xu <xyzinfernity@163.com>
1 parent 4d6310c commit 6d8b15d

File tree

4 files changed

+179
-82
lines changed

4 files changed

+179
-82
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java

+17
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.pulsar.broker;
2020

2121
import java.io.IOException;
22+
import java.util.concurrent.CompletionException;
2223

2324
public class PulsarServerException extends IOException {
2425
private static final long serialVersionUID = 1;
@@ -44,4 +45,20 @@ public NotFoundException(Throwable t) {
4445
super(t);
4546
}
4647
}
48+
49+
public static PulsarServerException from(Throwable throwable) {
50+
if (throwable instanceof CompletionException) {
51+
return from(throwable.getCause());
52+
}
53+
if (throwable instanceof PulsarServerException pulsarServerException) {
54+
return pulsarServerException;
55+
} else {
56+
return new PulsarServerException(throwable);
57+
}
58+
}
59+
60+
// Wrap this checked exception into a specific unchecked exception
61+
public static CompletionException toUncheckedException(PulsarServerException e) {
62+
return new CompletionException(e);
63+
}
4764
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -949,7 +949,7 @@ public void start() throws PulsarServerException {
949949
state = State.Started;
950950
} catch (Exception e) {
951951
LOG.error("Failed to start Pulsar service: {}", e.getMessage(), e);
952-
PulsarServerException startException = new PulsarServerException(e);
952+
PulsarServerException startException = PulsarServerException.from(e);
953953
readyForIncomingRequestsFuture.completeExceptionally(startException);
954954
throw startException;
955955
} finally {

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java

+41-81
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@
8080
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.SplitScheduler;
8181
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadScheduler;
8282
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
83-
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
8483
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory;
8584
import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
8685
import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
@@ -97,10 +96,7 @@
9796
import org.apache.pulsar.common.naming.TopicDomain;
9897
import org.apache.pulsar.common.naming.TopicName;
9998
import org.apache.pulsar.common.stats.Metrics;
100-
import org.apache.pulsar.common.util.Backoff;
101-
import org.apache.pulsar.common.util.BackoffBuilder;
10299
import org.apache.pulsar.common.util.FutureUtil;
103-
import org.apache.pulsar.metadata.api.MetadataStoreException;
104100
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
105101
import org.slf4j.Logger;
106102

@@ -123,10 +119,6 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
123119

124120
public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024;
125121

126-
public static final int STARTUP_TIMEOUT_SECONDS = 30;
127-
128-
public static final int MAX_RETRY = 5;
129-
130122
private static final String ELECTION_ROOT = "/loadbalance/extension/leader";
131123

132124
public static final Set<String> INTERNAL_TOPICS =
@@ -204,7 +196,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
204196

205197
private final ConcurrentHashMap<String, CompletableFuture<Optional<BrokerLookupData>>>
206198
lookupRequests = new ConcurrentHashMap<>();
207-
private final CompletableFuture<Void> initWaiter = new CompletableFuture<>();
199+
private final CompletableFuture<Boolean> initWaiter = new CompletableFuture<>();
208200

209201
/**
210202
* Get all the bundles that are owned by this broker.
@@ -331,7 +323,7 @@ public void start() throws PulsarServerException {
331323
return;
332324
}
333325
try {
334-
this.brokerRegistry = new BrokerRegistryImpl(pulsar);
326+
this.brokerRegistry = createBrokerRegistry(pulsar);
335327
this.leaderElectionService = new LeaderElectionService(
336328
pulsar.getCoordinationService(), pulsar.getBrokerId(),
337329
pulsar.getSafeWebServiceAddress(), ELECTION_ROOT,
@@ -346,53 +338,14 @@ public void start() throws PulsarServerException {
346338
});
347339
});
348340
});
349-
this.serviceUnitStateChannel = ServiceUnitStateChannelImpl.newInstance(pulsar);
341+
this.serviceUnitStateChannel = createServiceUnitStateChannel(pulsar);
350342
this.brokerRegistry.start();
351343
this.splitManager = new SplitManager(splitCounter);
352344
this.unloadManager = new UnloadManager(unloadCounter);
353345
this.serviceUnitStateChannel.listen(unloadManager);
354346
this.serviceUnitStateChannel.listen(splitManager);
355347
this.leaderElectionService.start();
356-
pulsar.runWhenReadyForIncomingRequests(() -> {
357-
Backoff backoff = new BackoffBuilder()
358-
.setInitialTime(100, TimeUnit.MILLISECONDS)
359-
.setMax(STARTUP_TIMEOUT_SECONDS, TimeUnit.SECONDS)
360-
.create();
361-
int retry = 0;
362-
while (!Thread.currentThread().isInterrupted()) {
363-
try {
364-
brokerRegistry.register();
365-
this.serviceUnitStateChannel.start();
366-
break;
367-
} catch (Exception e) {
368-
log.warn("The broker:{} failed to start service unit state channel. Retrying {} th ...",
369-
pulsar.getBrokerId(), ++retry, e);
370-
try {
371-
Thread.sleep(backoff.next());
372-
} catch (InterruptedException ex) {
373-
log.warn("Interrupted while sleeping.");
374-
// preserve thread's interrupt status
375-
Thread.currentThread().interrupt();
376-
try {
377-
pulsar.close();
378-
} catch (PulsarServerException exc) {
379-
log.error("Failed to close pulsar service.", exc);
380-
}
381-
return;
382-
}
383-
failStarting(e);
384-
if (retry >= MAX_RETRY) {
385-
log.error("Failed to start the service unit state channel after retry {} th. "
386-
+ "Closing pulsar service.", retry, e);
387-
try {
388-
pulsar.close();
389-
} catch (PulsarServerException ex) {
390-
log.error("Failed to close pulsar service.", ex);
391-
}
392-
}
393-
}
394-
}
395-
});
348+
396349
this.antiAffinityGroupPolicyHelper =
397350
new AntiAffinityGroupPolicyHelper(pulsar, serviceUnitStateChannel);
398351
antiAffinityGroupPolicyHelper.listenFailureDomainUpdate();
@@ -401,15 +354,10 @@ public void start() throws PulsarServerException {
401354
SimpleResourceAllocationPolicies policies = new SimpleResourceAllocationPolicies(pulsar);
402355
this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies);
403356
this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper));
404-
405-
try {
406-
this.brokerLoadDataStore = LoadDataStoreFactory
407-
.create(pulsar, BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
408-
this.topBundlesLoadDataStore = LoadDataStoreFactory
409-
.create(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
410-
} catch (LoadDataStoreException e) {
411-
throw new PulsarServerException(e);
412-
}
357+
this.brokerLoadDataStore = LoadDataStoreFactory
358+
.create(pulsar, BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
359+
this.topBundlesLoadDataStore = LoadDataStoreFactory
360+
.create(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
413361

414362
this.context = LoadManagerContextImpl.builder()
415363
.configuration(conf)
@@ -433,6 +381,7 @@ public void start() throws PulsarServerException {
433381

434382
pulsar.runWhenReadyForIncomingRequests(() -> {
435383
try {
384+
this.serviceUnitStateChannel.start();
436385
var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis();
437386

438387
this.brokerLoadDataReportTask = this.pulsar.getLoadManagerExecutor()
@@ -467,38 +416,33 @@ public void start() throws PulsarServerException {
467416
MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS);
468417

469418
this.splitScheduler.start();
470-
this.initWaiter.complete(null);
419+
this.initWaiter.complete(true);
471420
this.started = true;
472421
log.info("Started load manager.");
473-
} catch (Exception ex) {
474-
failStarting(ex);
422+
} catch (Throwable e) {
423+
failStarting(e);
475424
}
476425
});
477-
} catch (Exception ex) {
426+
} catch (Throwable ex) {
478427
failStarting(ex);
479428
}
480429
}
481430

482-
private void failStarting(Exception ex) {
483-
log.error("Failed to start the extensible load balance and close broker registry {}.",
484-
this.brokerRegistry, ex);
431+
private void failStarting(Throwable throwable) {
485432
if (this.brokerRegistry != null) {
486433
try {
487-
brokerRegistry.unregister();
488-
} catch (MetadataStoreException e) {
489-
// ignore
490-
}
491-
}
492-
if (this.serviceUnitStateChannel != null) {
493-
try {
494-
serviceUnitStateChannel.close();
495-
} catch (IOException e) {
496-
// ignore
434+
brokerRegistry.close();
435+
} catch (PulsarServerException e) {
436+
// If close failed, this broker might still exist in the metadata store. Then it could be found by other
437+
// brokers as an available broker. Hence, print a warning log for it.
438+
log.warn("Failed to close the broker registry: {}", e.getMessage());
497439
}
498440
}
499-
initWaiter.completeExceptionally(ex);
441+
initWaiter.complete(false); // exit the background thread gracefully
442+
throw PulsarServerException.toUncheckedException(PulsarServerException.from(throwable));
500443
}
501444

445+
502446
@Override
503447
public void initialize(PulsarService pulsar) {
504448
this.pulsar = pulsar;
@@ -843,7 +787,9 @@ synchronized void playLeader() {
843787
boolean becameFollower = false;
844788
while (!Thread.currentThread().isInterrupted()) {
845789
try {
846-
initWaiter.get();
790+
if (!initWaiter.get()) {
791+
return;
792+
}
847793
if (!serviceUnitStateChannel.isChannelOwner()) {
848794
becameFollower = true;
849795
break;
@@ -893,7 +839,9 @@ synchronized void playFollower() {
893839
boolean becameLeader = false;
894840
while (!Thread.currentThread().isInterrupted()) {
895841
try {
896-
initWaiter.get();
842+
if (!initWaiter.get()) {
843+
return;
844+
}
897845
if (serviceUnitStateChannel.isChannelOwner()) {
898846
becameLeader = true;
899847
break;
@@ -957,7 +905,9 @@ public List<Metrics> getMetrics() {
957905
@VisibleForTesting
958906
protected void monitor() {
959907
try {
960-
initWaiter.get();
908+
if (!initWaiter.get()) {
909+
return;
910+
}
961911

962912
// Monitor role
963913
// Periodically check the role in case ZK watcher fails.
@@ -1012,4 +962,14 @@ private void closeInternalTopics() {
1012962
log.warn("Failed to wait for closing internal topics", e);
1013963
}
1014964
}
965+
966+
@VisibleForTesting
967+
protected BrokerRegistry createBrokerRegistry(PulsarService pulsar) {
968+
return new BrokerRegistryImpl(pulsar);
969+
}
970+
971+
@VisibleForTesting
972+
protected ServiceUnitStateChannel createServiceUnitStateChannel(PulsarService pulsar) {
973+
return new ServiceUnitStateChannelImpl(pulsar);
974+
}
1015975
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.loadbalance.extensions;
20+
21+
import java.util.Optional;
22+
import lombok.Cleanup;
23+
import org.apache.pulsar.broker.PulsarServerException;
24+
import org.apache.pulsar.broker.PulsarService;
25+
import org.apache.pulsar.broker.ServiceConfiguration;
26+
import org.apache.pulsar.broker.loadbalance.LoadManager;
27+
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
28+
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
29+
import org.apache.pulsar.common.util.PortManager;
30+
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
31+
import org.awaitility.Awaitility;
32+
import org.mockito.Mockito;
33+
import org.testng.Assert;
34+
import org.testng.annotations.AfterClass;
35+
import org.testng.annotations.BeforeClass;
36+
import org.testng.annotations.Test;
37+
38+
public class LoadManagerFailFastTest {
39+
40+
private static final String cluster = "test";
41+
private final int zkPort = PortManager.nextLockedFreePort();
42+
private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextLockedFreePort);
43+
private final ServiceConfiguration config = new ServiceConfiguration();
44+
45+
@BeforeClass
46+
protected void setup() throws Exception {
47+
bk.start();
48+
config.setClusterName(cluster);
49+
config.setAdvertisedAddress("localhost");
50+
config.setBrokerServicePort(Optional.of(0));
51+
config.setWebServicePort(Optional.of(0));
52+
config.setMetadataStoreUrl("zk:localhost:" + zkPort);
53+
}
54+
55+
@AfterClass
56+
protected void cleanup() throws Exception {
57+
bk.stop();
58+
}
59+
60+
@Test(timeOut = 30000)
61+
public void testBrokerRegistryFailure() throws Exception {
62+
config.setLoadManagerClassName(BrokerRegistryLoadManager.class.getName());
63+
@Cleanup final var pulsar = new PulsarService(config);
64+
try {
65+
pulsar.start();
66+
Assert.fail();
67+
} catch (PulsarServerException e) {
68+
Assert.assertNull(e.getCause());
69+
Assert.assertEquals(e.getMessage(), "Cannot start BrokerRegistry");
70+
}
71+
Assert.assertTrue(pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).get()
72+
.isEmpty());
73+
}
74+
75+
@Test(timeOut = 30000)
76+
public void testServiceUnitStateChannelFailure() throws Exception {
77+
config.setLoadManagerClassName(ChannelLoadManager.class.getName());
78+
@Cleanup final var pulsar = new PulsarService(config);
79+
try {
80+
pulsar.start();
81+
Assert.fail();
82+
} catch (PulsarServerException e) {
83+
Assert.assertNull(e.getCause());
84+
Assert.assertEquals(e.getMessage(), "Cannot start ServiceUnitStateChannel");
85+
}
86+
Awaitility.await().untilAsserted(() -> Assert.assertTrue(pulsar.getLocalMetadataStore()
87+
.getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).get().isEmpty()));
88+
}
89+
90+
private static class BrokerRegistryLoadManager extends ExtensibleLoadManagerImpl {
91+
92+
@Override
93+
protected BrokerRegistry createBrokerRegistry(PulsarService pulsar) {
94+
final var mockBrokerRegistry = Mockito.mock(BrokerRegistryImpl.class);
95+
try {
96+
Mockito.doThrow(new PulsarServerException("Cannot start BrokerRegistry")).when(mockBrokerRegistry)
97+
.start();
98+
} catch (PulsarServerException e) {
99+
throw new RuntimeException(e);
100+
}
101+
return mockBrokerRegistry;
102+
}
103+
}
104+
105+
private static class ChannelLoadManager extends ExtensibleLoadManagerImpl {
106+
107+
@Override
108+
protected ServiceUnitStateChannel createServiceUnitStateChannel(PulsarService pulsar) {
109+
final var channel = Mockito.mock(ServiceUnitStateChannelImpl.class);
110+
try {
111+
Mockito.doThrow(new PulsarServerException("Cannot start ServiceUnitStateChannel")).when(channel)
112+
.start();
113+
} catch (PulsarServerException e) {
114+
throw new RuntimeException(e);
115+
}
116+
Mockito.doAnswer(__ -> null).when(channel).listen(Mockito.any());
117+
return channel;
118+
}
119+
}
120+
}

0 commit comments

Comments
 (0)