Skip to content

Commit f65f845

Browse files
authored
Rewrite FailureDetector interface and implementations to also work with the multi-stage engine (apache#15005)
1 parent 82bdda5 commit f65f845

File tree

22 files changed

+458
-301
lines changed

22 files changed

+458
-301
lines changed

pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java

+17-3
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@
5858
import org.apache.pinot.common.config.TlsConfig;
5959
import org.apache.pinot.common.config.provider.TableCache;
6060
import org.apache.pinot.common.cursors.AbstractResponseStore;
61+
import org.apache.pinot.common.failuredetector.FailureDetector;
62+
import org.apache.pinot.common.failuredetector.FailureDetectorFactory;
6163
import org.apache.pinot.common.function.FunctionRegistry;
6264
import org.apache.pinot.common.metadata.ZKMetadataProvider;
6365
import org.apache.pinot.common.metrics.BrokerGauge;
@@ -144,6 +146,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
144146
protected HelixExternalViewBasedQueryQuotaManager _queryQuotaManager;
145147
protected MultiStageQueryThrottler _multiStageQueryThrottler;
146148
protected AbstractResponseStore _responseStore;
149+
protected FailureDetector _failureDetector;
147150

148151
@Override
149152
public void init(PinotConfiguration brokerConf)
@@ -319,14 +322,22 @@ public void start()
319322
LOGGER.info("Initializing Broker Event Listener Factory");
320323
BrokerQueryEventListenerFactory.init(_brokerConf.subset(Broker.EVENT_LISTENER_CONFIG_PREFIX));
321324

325+
// Initialize the failure detector that removes servers from the broker routing table if they are not healthy
326+
_failureDetector = FailureDetectorFactory.getFailureDetector(_brokerConf, _brokerMetrics);
327+
_failureDetector.registerHealthyServerNotifier(
328+
instanceId -> _routingManager.includeServerToRouting(instanceId));
329+
_failureDetector.registerUnhealthyServerNotifier(
330+
instanceId -> _routingManager.excludeServerFromRouting(instanceId));
331+
_failureDetector.start();
332+
322333
// Create Broker request handler.
323334
String brokerId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId());
324335
String brokerRequestHandlerType =
325336
_brokerConf.getProperty(Broker.BROKER_REQUEST_HANDLER_TYPE, Broker.DEFAULT_BROKER_REQUEST_HANDLER_TYPE);
326337
BaseSingleStageBrokerRequestHandler singleStageBrokerRequestHandler;
327338
if (brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) {
328339
singleStageBrokerRequestHandler = new GrpcBrokerRequestHandler(_brokerConf, brokerId, _routingManager,
329-
_accessControlFactory, _queryQuotaManager, tableCache);
340+
_accessControlFactory, _queryQuotaManager, tableCache, _failureDetector);
330341
} else {
331342
// Default request handler type, i.e. netty
332343
NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, Broker.BROKER_NETTY_PREFIX);
@@ -337,7 +348,8 @@ public void start()
337348
}
338349
singleStageBrokerRequestHandler =
339350
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
340-
_queryQuotaManager, tableCache, nettyDefaults, tlsDefaults, _serverRoutingStatsManager);
351+
_queryQuotaManager, tableCache, nettyDefaults, tlsDefaults, _serverRoutingStatsManager,
352+
_failureDetector);
341353
}
342354
MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null;
343355
QueryDispatcher queryDispatcher = null;
@@ -350,7 +362,7 @@ public void start()
350362
queryDispatcher = createQueryDispatcher(_brokerConf);
351363
multiStageBrokerRequestHandler =
352364
new MultiStageBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
353-
_queryQuotaManager, tableCache, _multiStageQueryThrottler);
365+
_queryQuotaManager, tableCache, _multiStageQueryThrottler, _failureDetector);
354366
}
355367
TimeSeriesRequestHandler timeSeriesRequestHandler = null;
356368
if (StringUtils.isNotBlank(_brokerConf.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey()))) {
@@ -613,6 +625,8 @@ public void stop() {
613625
LOGGER.info("Stopping cluster change mediator");
614626
_clusterChangeMediator.stop();
615627

628+
_failureDetector.stop();
629+
616630
// Delay shutdown of request handler so that the pending queries can be finished. The participant Helix manager has
617631
// been disconnected, so instance should disappear from ExternalView soon and stop getting new queries.
618632
long delayShutdownTimeMs =

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java

+51-14
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pinot.broker.requesthandler;
2020

21+
import io.grpc.ConnectivityState;
2122
import java.util.HashMap;
2223
import java.util.Iterator;
2324
import java.util.List;
@@ -31,6 +32,7 @@
3132
import org.apache.pinot.broker.routing.BrokerRoutingManager;
3233
import org.apache.pinot.common.config.GrpcConfig;
3334
import org.apache.pinot.common.config.provider.TableCache;
35+
import org.apache.pinot.common.failuredetector.FailureDetector;
3436
import org.apache.pinot.common.proto.Server;
3537
import org.apache.pinot.common.request.BrokerRequest;
3638
import org.apache.pinot.common.response.broker.BrokerResponseNative;
@@ -43,22 +45,30 @@
4345
import org.apache.pinot.spi.config.table.TableType;
4446
import org.apache.pinot.spi.env.PinotConfiguration;
4547
import org.apache.pinot.spi.trace.RequestContext;
48+
import org.slf4j.Logger;
49+
import org.slf4j.LoggerFactory;
4650

4751

4852
/**
4953
* The <code>GrpcBrokerRequestHandler</code> class communicates query request via GRPC.
5054
*/
5155
@ThreadSafe
5256
public class GrpcBrokerRequestHandler extends BaseSingleStageBrokerRequestHandler {
57+
private static final Logger LOGGER = LoggerFactory.getLogger(GrpcBrokerRequestHandler.class);
58+
5359
private final StreamingReduceService _streamingReduceService;
5460
private final PinotStreamingQueryClient _streamingQueryClient;
61+
private final FailureDetector _failureDetector;
5562

5663
// TODO: Support TLS
5764
public GrpcBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
58-
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache) {
65+
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
66+
FailureDetector failureDetector) {
5967
super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache);
6068
_streamingReduceService = new StreamingReduceService(config);
6169
_streamingQueryClient = new PinotStreamingQueryClient(GrpcConfig.buildGrpcQueryConfig(config));
70+
_failureDetector = failureDetector;
71+
_failureDetector.registerUnhealthyServerRetrier(this::retryUnhealthyServer);
6272
}
6373

6474
@Override
@@ -81,7 +91,6 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
8191
@Nullable Map<ServerInstance, ServerRouteInfo> realtimeRoutingTable, long timeoutMs,
8292
ServerStats serverStats, RequestContext requestContext)
8393
throws Exception {
84-
// TODO: Support failure detection
8594
// TODO: Add servers queried/responded stats
8695
assert offlineBrokerRequest != null || realtimeBrokerRequest != null;
8796
Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap = new HashMap<>();
@@ -112,33 +121,39 @@ private void sendRequest(long requestId, TableType tableType, BrokerRequest brok
112121
ServerInstance serverInstance = routingEntry.getKey();
113122
// TODO: support optional segments for GrpcQueryServer.
114123
List<String> segments = routingEntry.getValue().getSegments();
115-
String serverHost = serverInstance.getHostname();
116-
int port = serverInstance.getGrpcPort();
117124
// TODO: enable throttling on per host bases.
118-
Iterator<Server.ServerResponse> streamingResponse = _streamingQueryClient.submit(serverHost, port,
119-
new GrpcRequestBuilder().setRequestId(requestId).setBrokerId(_brokerId).setEnableTrace(trace)
120-
.setEnableStreaming(true).setBrokerRequest(brokerRequest).setSegments(segments).build());
121-
responseMap.put(serverInstance.toServerRoutingInstance(tableType, ServerInstance.RoutingType.GRPC),
122-
streamingResponse);
125+
try {
126+
Iterator<Server.ServerResponse> streamingResponse = _streamingQueryClient.submit(serverInstance,
127+
new GrpcRequestBuilder().setRequestId(requestId).setBrokerId(_brokerId).setEnableTrace(trace)
128+
.setEnableStreaming(true).setBrokerRequest(brokerRequest).setSegments(segments).build());
129+
responseMap.put(serverInstance.toServerRoutingInstance(tableType, ServerInstance.RoutingType.GRPC),
130+
streamingResponse);
131+
} catch (Exception e) {
132+
LOGGER.warn("Failed to send request {} to server: {}", requestId, serverInstance.getInstanceId(), e);
133+
_failureDetector.markServerUnhealthy(serverInstance.getInstanceId());
134+
}
123135
}
124136
}
125137

126138
public static class PinotStreamingQueryClient {
127139
private final Map<String, GrpcQueryClient> _grpcQueryClientMap = new ConcurrentHashMap<>();
140+
private final Map<String, String> _instanceIdToHostnamePortMap = new ConcurrentHashMap<>();
128141
private final GrpcConfig _config;
129142

130143
public PinotStreamingQueryClient(GrpcConfig config) {
131144
_config = config;
132145
}
133146

134-
public Iterator<Server.ServerResponse> submit(String host, int port, Server.ServerRequest serverRequest) {
135-
GrpcQueryClient client = getOrCreateGrpcQueryClient(host, port);
147+
public Iterator<Server.ServerResponse> submit(ServerInstance serverInstance, Server.ServerRequest serverRequest) {
148+
GrpcQueryClient client = getOrCreateGrpcQueryClient(serverInstance);
136149
return client.submit(serverRequest);
137150
}
138151

139-
private GrpcQueryClient getOrCreateGrpcQueryClient(String host, int port) {
140-
String key = String.format("%s_%d", host, port);
141-
return _grpcQueryClientMap.computeIfAbsent(key, k -> new GrpcQueryClient(host, port, _config));
152+
private GrpcQueryClient getOrCreateGrpcQueryClient(ServerInstance serverInstance) {
153+
String hostnamePort = String.format("%s_%d", serverInstance.getHostname(), serverInstance.getGrpcPort());
154+
_instanceIdToHostnamePortMap.put(serverInstance.getInstanceId(), hostnamePort);
155+
return _grpcQueryClientMap.computeIfAbsent(hostnamePort,
156+
k -> new GrpcQueryClient(serverInstance.getHostname(), serverInstance.getGrpcPort(), _config));
142157
}
143158

144159
public void shutdown() {
@@ -147,4 +162,26 @@ public void shutdown() {
147162
}
148163
}
149164
}
165+
166+
/**
167+
* Check if a server that was previously detected as unhealthy is now healthy.
168+
*/
169+
private boolean retryUnhealthyServer(String instanceId) {
170+
LOGGER.info("Checking gRPC connection to unhealthy server: {}", instanceId);
171+
ServerInstance serverInstance = _routingManager.getEnabledServerInstanceMap().get(instanceId);
172+
if (serverInstance == null) {
173+
LOGGER.info("Failed to find enabled server: {} in routing manager, skipping the retry", instanceId);
174+
return false;
175+
}
176+
177+
String hostnamePort = _streamingQueryClient._instanceIdToHostnamePortMap.get(instanceId);
178+
GrpcQueryClient client = _streamingQueryClient._grpcQueryClientMap.get(hostnamePort);
179+
180+
if (client == null) {
181+
LOGGER.warn("No GrpcQueryClient found for server with instanceId: {}", instanceId);
182+
return false;
183+
}
184+
185+
return client.getChannel().getState(true) == ConnectivityState.READY;
186+
}
150187
}

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java

+27-4
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.pinot.common.config.provider.TableCache;
5252
import org.apache.pinot.common.exception.QueryException;
5353
import org.apache.pinot.common.exception.QueryInfoException;
54+
import org.apache.pinot.common.failuredetector.FailureDetector;
5455
import org.apache.pinot.common.metrics.BrokerMeter;
5556
import org.apache.pinot.common.metrics.BrokerQueryPhase;
5657
import org.apache.pinot.common.response.BrokerResponse;
@@ -67,6 +68,7 @@
6768
import org.apache.pinot.common.utils.tls.TlsUtils;
6869
import org.apache.pinot.core.auth.Actions;
6970
import org.apache.pinot.core.auth.TargetType;
71+
import org.apache.pinot.core.transport.ServerInstance;
7072
import org.apache.pinot.query.QueryEnvironment;
7173
import org.apache.pinot.query.mailbox.MailboxService;
7274
import org.apache.pinot.query.planner.explain.AskingServerStageExplainer;
@@ -91,6 +93,10 @@
9193
import org.slf4j.LoggerFactory;
9294

9395

96+
/**
97+
* This class serves as the broker entry-point for handling incoming multi-stage query requests and dispatching them
98+
* to servers.
99+
*/
94100
public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
95101
private static final Logger LOGGER = LoggerFactory.getLogger(MultiStageBrokerRequestHandler.class);
96102

@@ -104,17 +110,20 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
104110

105111
public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
106112
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
107-
MultiStageQueryThrottler queryThrottler) {
113+
MultiStageQueryThrottler queryThrottler, FailureDetector failureDetector) {
108114
super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache);
109115
String hostname = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
110116
int port = Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
111-
_workerManager = new WorkerManager(hostname, port, _routingManager);
117+
_workerManager = new WorkerManager(_brokerId, hostname, port, _routingManager);
112118
TlsConfig tlsConfig = config.getProperty(
113119
CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_TLS_ENABLED,
114120
CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_TLS_ENABLED) ? TlsUtils.extractTlsConfig(config,
115121
CommonConstants.Broker.BROKER_TLS_PREFIX) : null;
116-
_queryDispatcher = new QueryDispatcher(
117-
new MailboxService(hostname, port, config, tlsConfig), tlsConfig, this.isQueryCancellationEnabled());
122+
123+
failureDetector.registerUnhealthyServerRetrier(this::retryUnhealthyServer);
124+
_queryDispatcher =
125+
new QueryDispatcher(new MailboxService(hostname, port, config, tlsConfig), tlsConfig, failureDetector,
126+
this.isQueryCancellationEnabled());
118127
LOGGER.info("Initialized MultiStageBrokerRequestHandler on host: {}, port: {} with broker id: {}, timeout: {}ms, "
119128
+ "query log max length: {}, query log max rate: {}, query cancellation enabled: {}", hostname, port,
120129
_brokerId, _brokerTimeoutMs, _queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit(),
@@ -538,4 +547,18 @@ private static String toSizeLimitedString(Set<String> setOfStrings, int limit) {
538547
return setOfStrings.stream().limit(limit)
539548
.collect(Collectors.joining(", ", "[", setOfStrings.size() > limit ? "...]" : "]"));
540549
}
550+
551+
/**
552+
* Check if a server that was previously detected as unhealthy is now healthy.
553+
*/
554+
public boolean retryUnhealthyServer(String instanceId) {
555+
LOGGER.info("Checking gRPC connection to unhealthy server: {}", instanceId);
556+
ServerInstance serverInstance = _routingManager.getEnabledServerInstanceMap().get(instanceId);
557+
if (serverInstance == null) {
558+
LOGGER.info("Failed to find enabled server: {} in routing manager, skipping the retry", instanceId);
559+
return false;
560+
}
561+
562+
return _queryDispatcher.checkConnectivityToInstance(instanceId);
563+
}
541564
}

0 commit comments

Comments
 (0)