Skip to content

Commit 6c80b93

Browse files
protect usage MSQE compiler for empty schema polyfill with config param disabled by default (apache#15078)
1 parent 24e5bb3 commit 6c80b93

File tree

8 files changed

+59
-13
lines changed

8 files changed

+59
-13
lines changed

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ
146146
// if >= 0, then overrides default limit of 10, otherwise setting is ignored
147147
protected final int _defaultQueryLimit;
148148
protected final boolean _enableMultistageMigrationMetric;
149+
protected final boolean _useMSEToFillEmptyResponseSchema;
149150
protected ExecutorService _multistageCompileExecutor;
150151
protected BlockingQueue<Pair<String, String>> _multistageCompileQueryQueue;
151152

@@ -179,6 +180,9 @@ public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String bro
179180
_multistageCompileQueryQueue = new LinkedBlockingQueue<>(1000);
180181
}
181182

183+
_useMSEToFillEmptyResponseSchema = _config.getProperty(Broker.USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA,
184+
Broker.DEFAULT_USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA);
185+
182186
LOGGER.info("Initialized {} with broker id: {}, timeout: {}ms, query response limit: {}, "
183187
+ "default query limit {}, query log max length: {}, query log max rate: {}, query cancellation "
184188
+ "enabled: {}", getClass().getSimpleName(), _brokerId, _brokerTimeoutMs, _queryResponseLimit,
@@ -869,7 +873,9 @@ protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAn
869873
// server returns STRING as default dataType for all columns in (some) scenarios where no rows are returned
870874
// this is an attempt to return more faithful information based on other sources
871875
if (brokerResponse.getNumRowsResultSet() == 0) {
872-
ParserUtils.fillEmptyResponseSchema(brokerResponse, _tableCache, schema, database, query);
876+
boolean useMSE = QueryOptionsUtils.isUseMSEToFillEmptySchema(
877+
pinotQuery.getQueryOptions(), _useMSEToFillEmptyResponseSchema);
878+
ParserUtils.fillEmptyResponseSchema(useMSE, brokerResponse, _tableCache, schema, database, query);
873879
}
874880

875881
// Set total query processing time
@@ -964,7 +970,9 @@ private BrokerResponseNative getEmptyBrokerOnlyResponse(PinotQuery pinotQuery, R
964970

965971
// Send empty response since we don't need to evaluate either offline or realtime request.
966972
BrokerResponseNative brokerResponse = BrokerResponseNative.empty();
967-
ParserUtils.fillEmptyResponseSchema(brokerResponse, _tableCache, schema, database, query);
973+
boolean useMSE = QueryOptionsUtils.isUseMSEToFillEmptySchema(
974+
pinotQuery.getQueryOptions(), _useMSEToFillEmptyResponseSchema);
975+
ParserUtils.fillEmptyResponseSchema(useMSE, brokerResponse, _tableCache, schema, database, query);
968976
brokerResponse.setTimeUsedMs(System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis());
969977
_queryLogger.log(
970978
new QueryLogger.QueryLogParams(requestContext, tableName, brokerResponse,

pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java

+5
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,11 @@ public static boolean isSecondaryWorkload(Map<String, String> queryOptions) {
362362
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.IS_SECONDARY_WORKLOAD));
363363
}
364364

365+
public static Boolean isUseMSEToFillEmptySchema(Map<String, String> queryOptions, boolean defaultValue) {
366+
String useMSEToFillEmptySchema = queryOptions.get(QueryOptionKey.USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA);
367+
return useMSEToFillEmptySchema != null ? Boolean.parseBoolean(useMSEToFillEmptySchema) : defaultValue;
368+
}
369+
365370
@Nullable
366371
private static Integer uncheckedParseInt(String optionName, @Nullable String optionValue) {
367372
if (optionValue == null) {

pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java

+6
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,12 @@ protected void overrideControllerConf(Map<String, Object> properties) {
8585
500);
8686
}
8787

88+
@Override
89+
protected void overrideBrokerConf(PinotConfiguration brokerConf) {
90+
super.overrideBrokerConf(brokerConf);
91+
brokerConf.setProperty(CommonConstants.Broker.USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA, true);
92+
}
93+
8894
@Override
8995
protected void overrideServerConf(PinotConfiguration serverConf) {
9096
try {

pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java

+7
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.pinot.spi.config.table.TableConfig;
3737
import org.apache.pinot.spi.config.table.TableType;
3838
import org.apache.pinot.spi.data.Schema;
39+
import org.apache.pinot.spi.env.PinotConfiguration;
3940
import org.apache.pinot.spi.utils.CommonConstants;
4041
import org.apache.pinot.spi.utils.InstanceTypeUtils;
4142
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -82,6 +83,12 @@ protected List<FieldConfig> getFieldConfigs() {
8283
CompressionCodec.MV_ENTRY_DICT, null));
8384
}
8485

86+
@Override
87+
protected void overrideBrokerConf(PinotConfiguration brokerConf) {
88+
super.overrideBrokerConf(brokerConf);
89+
brokerConf.setProperty(CommonConstants.Broker.USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA, true);
90+
}
91+
8592
@BeforeClass
8693
public void setUp()
8794
throws Exception {

pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionNewSegmentMetadataCreationFailureTest.java

-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ public void testSegmentAssignment()
7070
segmentZKMetadata -> segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.COMMITTING)
7171
.count() == NUM_REALTIME_SEGMENTS_ZK_METADATA_WITH_FAILURE;
7272
}, 1000, 100000, "Some segments are still IN_PROGRESS");
73-
7473
runValidationAndVerify();
7574
}
7675
}

pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java

+8
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
import org.apache.pinot.spi.config.task.AdhocTaskConfig;
3030
import org.apache.pinot.spi.data.FieldSpec;
3131
import org.apache.pinot.spi.data.Schema;
32+
import org.apache.pinot.spi.env.PinotConfiguration;
3233
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
34+
import org.apache.pinot.spi.utils.CommonConstants;
3335
import org.apache.pinot.spi.utils.JsonUtils;
3436
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
3537
import org.apache.pinot.util.TestUtils;
@@ -46,6 +48,12 @@
4648
public class SegmentGenerationMinionClusterIntegrationTest extends BaseClusterIntegrationTest {
4749
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationMinionClusterIntegrationTest.class);
4850

51+
@Override
52+
protected void overrideBrokerConf(PinotConfiguration brokerConf) {
53+
super.overrideBrokerConf(brokerConf);
54+
brokerConf.setProperty(CommonConstants.Broker.USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA, true);
55+
}
56+
4957
@BeforeClass
5058
public void setUp()
5159
throws Exception {

pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java

+15-10
Original file line numberDiff line numberDiff line change
@@ -62,27 +62,32 @@ public static boolean canCompileWithMultiStageEngine(String query, String databa
6262
* 2. Data schema has all columns set to default type (STRING) (when all segments pruned on server).
6363
*
6464
* Priority is:
65-
* - Types from multi-stage engine validation for the given query.
65+
* - Types from multi-stage engine validation for the given query (if allowed).
6666
* - Types from schema for the given table (only applicable to selection fields).
6767
* - Types from single-stage engine response (no action).
6868
*
6969
* Multi-stage engine schema will be available only if query compiles.
7070
*/
71-
public static void fillEmptyResponseSchema(BrokerResponse response, TableCache tableCache, Schema schema,
72-
String database, String query) {
71+
public static void fillEmptyResponseSchema(boolean useMSE, BrokerResponse response, TableCache tableCache,
72+
Schema schema, String database, String query) {
7373
Preconditions.checkState(response.getNumRowsResultSet() == 0, "Cannot fill schema for non-empty response");
7474

7575
DataSchema dataSchema = response.getResultTable() != null ? response.getResultTable().getDataSchema() : null;
7676

7777
List<RelDataTypeField> dataTypeFields = null;
78-
try {
79-
QueryEnvironment queryEnvironment = new QueryEnvironment(database, tableCache, null);
80-
RelRoot node = queryEnvironment.getRelRootIfCanCompile(query);
81-
if (node != null && node.validatedRowType != null) {
82-
dataTypeFields = node.validatedRowType.getFieldList();
78+
// Turn on (with pinot.broker.use.mse.to.fill.empty.response.schema=true or query option
79+
// useMSEToFillEmptyResponseSchema=true) only for clusters where no queries with huge IN clauses are expected
80+
// (see https://github.com/apache/pinot/issues/15064)
81+
if (useMSE) {
82+
try {
83+
QueryEnvironment queryEnvironment = new QueryEnvironment(database, tableCache, null);
84+
RelRoot node = queryEnvironment.getRelRootIfCanCompile(query);
85+
if (node != null && node.validatedRowType != null) {
86+
dataTypeFields = node.validatedRowType.getFieldList();
87+
}
88+
} catch (Exception ignored) {
89+
// Ignored
8390
}
84-
} catch (Exception ignored) {
85-
// Ignored
8691
}
8792

8893
if (dataSchema == null && dataTypeFields == null) {

pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java

+8
Original file line numberDiff line numberDiff line change
@@ -584,6 +584,10 @@ public static class QueryOptionKey {
584584

585585
// Custom Query ID provided by the client
586586
public static final String CLIENT_QUERY_ID = "clientQueryId";
587+
588+
// Use MSE compiler when trying to fill a response with no schema metadata
589+
// (overrides the "pinot.broker.use.mse.to.fill.empty.response.schema" broker conf)
590+
public static final String USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA = "useMSEToFillEmptyResponseSchema";
587591
}
588592

589593
public static class QueryOptionValue {
@@ -704,6 +708,10 @@ public enum Type {
704708
}
705709

706710
public static final String PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY = "pinot.broker.storage.factory";
711+
712+
public static final String USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA =
713+
"pinot.broker.use.mse.to.fill.empty.response.schema";
714+
public static final boolean DEFAULT_USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA = false;
707715
}
708716

709717
public static class Server {

0 commit comments

Comments
 (0)