Skip to content

Commit 7239d34

Browse files
authored
Added support to pause and resume ingestion based on resource utilization (apache#15008)
1 parent 5f220b3 commit 7239d34

File tree

19 files changed

+1094
-10
lines changed

19 files changed

+1094
-10
lines changed

pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,10 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
187187
TABLE_REBALANCE_IN_PROGRESS("tableRebalanceInProgress", false),
188188

189189
// Number of reingested segments getting uploaded
190-
REINGESTED_SEGMENT_UPLOADS_IN_PROGRESS("reingestedSegmentUploadsInProgress", true);
190+
REINGESTED_SEGMENT_UPLOADS_IN_PROGRESS("reingestedSegmentUploadsInProgress", true),
191+
192+
// Resource utilization is within limits or not for a table
193+
RESOURCE_UTILIZATION_LIMIT_EXCEEDED("ResourceUtilizationLimitExceeded", false);
191194

192195
private final String _gaugeName;
193196
private final String _unit;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.common.restlet.resources;
20+
21+
import com.fasterxml.jackson.annotation.JsonCreator;
22+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
23+
import com.fasterxml.jackson.annotation.JsonProperty;
24+
25+
26+
/**
27+
* A simple container class to hold disk usage information.
28+
*/
29+
@JsonIgnoreProperties(ignoreUnknown = true)
30+
public class DiskUsageInfo {
31+
private final String _instanceId;
32+
private final String _path;
33+
private final long _totalSpaceBytes;
34+
private final long _usedSpaceBytes;
35+
private final long _lastUpdatedTimeInEpochMs;
36+
37+
@JsonCreator
38+
public DiskUsageInfo(@JsonProperty("instanceId") String instanceId, @JsonProperty("path") String path,
39+
@JsonProperty("totalSpaceBytes") long totalSpaceBytes,
40+
@JsonProperty("usedSpaceBytes") long usedSpaceBytes,
41+
@JsonProperty("lastUpdatedTimeInEpochMs") long lastUpdatedTimeInEpochMs) {
42+
_instanceId = instanceId;
43+
_path = path;
44+
_totalSpaceBytes = totalSpaceBytes;
45+
_usedSpaceBytes = usedSpaceBytes;
46+
_lastUpdatedTimeInEpochMs = lastUpdatedTimeInEpochMs;
47+
}
48+
49+
public String getInstanceId() {
50+
return _instanceId;
51+
}
52+
53+
public String getPath() {
54+
return _path;
55+
}
56+
57+
public long getTotalSpaceBytes() {
58+
return _totalSpaceBytes;
59+
}
60+
61+
public long getUsedSpaceBytes() {
62+
return _usedSpaceBytes;
63+
}
64+
65+
public long getLastUpdatedTimeInEpochMs() {
66+
return _lastUpdatedTimeInEpochMs;
67+
}
68+
69+
public String toString() {
70+
return "DiskUsageInfo{" + "_instanceId='" + _instanceId + '\'' + ", _path='" + _path + '\'' + ", _totalSpaceBytes="
71+
+ _totalSpaceBytes + ", _usedSpaceBytes=" + _usedSpaceBytes + ", _lastUpdatedTimeInEpochMs="
72+
+ _lastUpdatedTimeInEpochMs + '}';
73+
}
74+
}

pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java

+17-2
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,11 @@
113113
import org.apache.pinot.controller.tuner.TableConfigTunerRegistry;
114114
import org.apache.pinot.controller.util.TableSizeReader;
115115
import org.apache.pinot.controller.validation.BrokerResourceValidationManager;
116+
import org.apache.pinot.controller.validation.DiskUtilizationChecker;
116117
import org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker;
117118
import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
119+
import org.apache.pinot.controller.validation.ResourceUtilizationChecker;
120+
import org.apache.pinot.controller.validation.ResourceUtilizationManager;
118121
import org.apache.pinot.controller.validation.StorageQuotaChecker;
119122
import org.apache.pinot.core.periodictask.PeriodicTask;
120123
import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
@@ -204,6 +207,8 @@ public abstract class BaseControllerStarter implements ServiceStartable {
204207
protected ExecutorService _tenantRebalanceExecutorService;
205208
protected TableSizeReader _tableSizeReader;
206209
protected StorageQuotaChecker _storageQuotaChecker;
210+
protected DiskUtilizationChecker _diskUtilizationChecker;
211+
protected ResourceUtilizationManager _resourceUtilizationManager;
207212

208213
@Override
209214
public void init(PinotConfiguration pinotConfiguration)
@@ -510,6 +515,9 @@ private void setUpPinotController() {
510515
_storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader, _controllerMetrics, _leadControllerManager,
511516
_helixResourceManager, _config);
512517

518+
_diskUtilizationChecker = new DiskUtilizationChecker(_helixResourceManager, _config);
519+
_resourceUtilizationManager = new ResourceUtilizationManager(_config, _diskUtilizationChecker);
520+
513521
// Setting up periodic tasks
514522
List<PeriodicTask> controllerPeriodicTasks = setupControllerPeriodicTasks();
515523
LOGGER.info("Init controller periodic tasks scheduler");
@@ -561,6 +569,8 @@ protected void configure() {
561569
bind(_tenantRebalancer).to(TenantRebalancer.class);
562570
bind(_tableSizeReader).to(TableSizeReader.class);
563571
bind(_storageQuotaChecker).to(StorageQuotaChecker.class);
572+
bind(_diskUtilizationChecker).to(DiskUtilizationChecker.class);
573+
bind(_resourceUtilizationManager).to(ResourceUtilizationManager.class);
564574
bind(controllerStartTime).named(ControllerAdminApiApplication.START_TIME);
565575
String loggerRootDir = _config.getProperty(CommonConstants.Controller.CONFIG_OF_LOGGER_ROOT_DIR);
566576
if (loggerRootDir != null) {
@@ -862,7 +872,8 @@ protected List<PeriodicTask> setupControllerPeriodicTasks() {
862872
_taskManagerStatusCache = getTaskManagerStatusCache();
863873
_taskManager =
864874
new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _leadControllerManager, _config,
865-
_controllerMetrics, _taskManagerStatusCache, _executorService, _connectionManager);
875+
_controllerMetrics, _taskManagerStatusCache, _executorService, _connectionManager,
876+
_resourceUtilizationManager);
866877
periodicTasks.add(_taskManager);
867878
_retentionManager =
868879
new RetentionManager(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
@@ -873,7 +884,8 @@ protected List<PeriodicTask> setupControllerPeriodicTasks() {
873884
periodicTasks.add(_offlineSegmentIntervalChecker);
874885
_realtimeSegmentValidationManager =
875886
new RealtimeSegmentValidationManager(_config, _helixResourceManager, _leadControllerManager,
876-
_pinotLLCRealtimeSegmentManager, _validationMetrics, _controllerMetrics, _storageQuotaChecker);
887+
_pinotLLCRealtimeSegmentManager, _validationMetrics, _controllerMetrics, _storageQuotaChecker,
888+
_resourceUtilizationManager);
877889
periodicTasks.add(_realtimeSegmentValidationManager);
878890
_brokerResourceValidationManager =
879891
new BrokerResourceValidationManager(_config, _helixResourceManager, _leadControllerManager, _controllerMetrics);
@@ -902,6 +914,9 @@ protected List<PeriodicTask> setupControllerPeriodicTasks() {
902914
PeriodicTask responseStoreCleaner = new ResponseStoreCleaner(_config, _helixResourceManager, _leadControllerManager,
903915
_controllerMetrics, _executorService, _connectionManager);
904916
periodicTasks.add(responseStoreCleaner);
917+
PeriodicTask resourceUtilizationChecker = new ResourceUtilizationChecker(_config, _connectionManager,
918+
_controllerMetrics, _diskUtilizationChecker, _executorService, _helixResourceManager);
919+
periodicTasks.add(resourceUtilizationChecker);
905920

906921
return periodicTasks;
907922
}

pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java

+38
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,14 @@ private static long getRandomInitialDelayInSeconds() {
300300
private static final String REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS =
301301
"controller.realtime.segment.metadata.commit.numLocks";
302302
private static final String ENABLE_STORAGE_QUOTA_CHECK = "controller.enable.storage.quota.check";
303+
private static final String DISK_UTILIZATION_THRESHOLD = "controller.disk.utilization.threshold"; // 0 < threshold < 1
304+
private static final String DISK_UTILIZATION_CHECK_TIMEOUT_MS = "controller.disk.utilization.check.timeoutMs";
305+
private static final String DISK_UTILIZATION_PATH = "controller.disk.utilization.path";
306+
private static final String ENABLE_RESOURCE_UTILIZATION_CHECK = "controller.enable.resource.utilization.check";
307+
private static final String RESOURCE_UTILIZATION_CHECKER_INITIAL_DELAY =
308+
"controller.resource.utilization.checker.initial.delay";
309+
private static final String RESOURCE_UTILIZATION_CHECKER_FREQUENCY =
310+
"controller.resource.utilization.checker.frequency";
303311
private static final String ENABLE_BATCH_MESSAGE_MODE = "controller.enable.batch.message.mode";
304312
public static final String DIM_TABLE_MAX_SIZE = "controller.dimTable.maxSize";
305313

@@ -323,6 +331,12 @@ private static long getRandomInitialDelayInSeconds() {
323331
private static final int DEFAULT_MIN_NUM_CHARS_IN_IS_TO_TURN_ON_COMPRESSION = -1;
324332
private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 64;
325333
private static final boolean DEFAULT_ENABLE_STORAGE_QUOTA_CHECK = true;
334+
private static final double DEFAULT_DISK_UTILIZATION_THRESHOLD = 0.95;
335+
private static final int DEFAULT_DISK_UTILIZATION_CHECK_TIMEOUT_MS = 30_000;
336+
private static final String DEFAULT_DISK_UTILIZATION_PATH = "/home/pinot/data";
337+
private static final boolean DEFAULT_ENABLE_RESOURCE_UTILIZATION_CHECK = false;
338+
private static final long DEFAULT_RESOURCE_UTILIZATION_CHECKER_INITIAL_DELAY = 300L; // 5 minutes
339+
private static final long DEFAULT_RESOURCE_UTILIZATION_CHECKER_FREQUENCY = 300L; // 5 minutes
326340
private static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = false;
327341
// Disallow any high level consumer (HLC) table
328342
private static final boolean DEFAULT_ALLOW_HLC_TABLES = false;
@@ -987,6 +1001,30 @@ public boolean getEnableStorageQuotaCheck() {
9871001
return getProperty(ENABLE_STORAGE_QUOTA_CHECK, DEFAULT_ENABLE_STORAGE_QUOTA_CHECK);
9881002
}
9891003

1004+
public String getDiskUtilizationPath() {
1005+
return getProperty(DISK_UTILIZATION_PATH, DEFAULT_DISK_UTILIZATION_PATH);
1006+
}
1007+
1008+
public double getDiskUtilizationThreshold() {
1009+
return getProperty(DISK_UTILIZATION_THRESHOLD, DEFAULT_DISK_UTILIZATION_THRESHOLD);
1010+
}
1011+
1012+
public int getDiskUtilizationCheckTimeoutMs() {
1013+
return getProperty(DISK_UTILIZATION_CHECK_TIMEOUT_MS, DEFAULT_DISK_UTILIZATION_CHECK_TIMEOUT_MS);
1014+
}
1015+
1016+
public long getResourceUtilizationCheckerInitialDelay() {
1017+
return getProperty(RESOURCE_UTILIZATION_CHECKER_INITIAL_DELAY, DEFAULT_RESOURCE_UTILIZATION_CHECKER_INITIAL_DELAY);
1018+
}
1019+
1020+
public long getResourceUtilizationCheckerFrequency() {
1021+
return getProperty(RESOURCE_UTILIZATION_CHECKER_FREQUENCY, DEFAULT_RESOURCE_UTILIZATION_CHECKER_FREQUENCY);
1022+
}
1023+
1024+
public boolean isResourceUtilizationCheckEnabled() {
1025+
return getProperty(ENABLE_RESOURCE_UTILIZATION_CHECK, DEFAULT_ENABLE_RESOURCE_UTILIZATION_CHECK);
1026+
}
1027+
9901028
public boolean getEnableBatchMessageMode() {
9911029
return getProperty(ENABLE_BATCH_MESSAGE_MODE, DEFAULT_ENABLE_BATCH_MESSAGE_MODE);
9921030
}

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java

+24-1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
5656
import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry;
5757
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
58+
import org.apache.pinot.controller.validation.ResourceUtilizationManager;
5859
import org.apache.pinot.core.minion.PinotTaskConfig;
5960
import org.apache.pinot.spi.config.table.TableConfig;
6061
import org.apache.pinot.spi.config.table.TableTaskConfig;
@@ -100,6 +101,7 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
100101
private final PinotHelixTaskResourceManager _helixTaskResourceManager;
101102
private final ClusterInfoAccessor _clusterInfoAccessor;
102103
private final TaskGeneratorRegistry _taskGeneratorRegistry;
104+
private final ResourceUtilizationManager _resourceUtilizationManager;
103105

104106
// For cron-based scheduling
105107
private final Scheduler _scheduler;
@@ -120,11 +122,12 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
120122
PinotHelixResourceManager helixResourceManager, LeadControllerManager leadControllerManager,
121123
ControllerConf controllerConf, ControllerMetrics controllerMetrics,
122124
TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> taskManagerStatusCache, Executor executor,
123-
PoolingHttpClientConnectionManager connectionManager) {
125+
PoolingHttpClientConnectionManager connectionManager, ResourceUtilizationManager resourceUtilizationManager) {
124126
super("PinotTaskManager", controllerConf.getTaskManagerFrequencyInSeconds(),
125127
controllerConf.getPinotTaskManagerInitialDelaySeconds(), helixResourceManager, leadControllerManager,
126128
controllerMetrics);
127129
_helixTaskResourceManager = helixTaskResourceManager;
130+
_resourceUtilizationManager = resourceUtilizationManager;
128131
_taskManagerStatusCache = taskManagerStatusCache;
129132
_clusterInfoAccessor =
130133
new ClusterInfoAccessor(helixResourceManager, helixTaskResourceManager, controllerConf, controllerMetrics,
@@ -205,6 +208,16 @@ public Map<String, String> createTask(String taskType, String tableName, @Nullab
205208
for (String tableNameWithType : tableNameWithTypes) {
206209
TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
207210
LOGGER.info("Trying to create tasks of type: {}, table: {}", taskType, tableNameWithType);
211+
try {
212+
if (!_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableNameWithType)) {
213+
LOGGER.warn("Resource utilization is above threshold, skipping task creation for table: {}", tableName);
214+
_controllerMetrics.setOrUpdateTableGauge(tableName, ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 1L);
215+
continue;
216+
}
217+
_controllerMetrics.setOrUpdateTableGauge(tableName, ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 0L);
218+
} catch (Exception e) {
219+
LOGGER.warn("Caught exception while checking resource utilization for table: {}", tableName, e);
220+
}
208221
List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateTasks(tableConfig, taskConfigs);
209222
if (pinotTaskConfigs.isEmpty()) {
210223
LOGGER.warn("No ad-hoc task generated for task type: {}", taskType);
@@ -695,6 +708,16 @@ protected TaskSchedulingInfo scheduleTask(PinotTaskGenerator taskGenerator, List
695708
for (TableConfig tableConfig : enabledTableConfigs) {
696709
String tableName = tableConfig.getTableName();
697710
try {
711+
if (!_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableName)) {
712+
String message = String.format("Skipping tasks generation as resource utilization is not within limits for "
713+
+ "table: %s. Disk utilization for one or more servers hosting this table has exceeded the threshold. "
714+
+ "Tasks won't be generated until the issue is mitigated.", tableName);
715+
LOGGER.warn(message);
716+
response.addSchedulingError(message);
717+
_controllerMetrics.setOrUpdateTableGauge(tableName, ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 1L);
718+
continue;
719+
}
720+
_controllerMetrics.setOrUpdateTableGauge(tableName, ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 0L);
698721
String minionInstanceTag = minionInstanceTagForTask != null ? minionInstanceTagForTask
699722
: taskGenerator.getMinionInstanceTag(tableConfig);
700723
List<PinotTaskConfig> presentTaskConfig =

0 commit comments

Comments
 (0)