Skip to content

Commit

Permalink
Add two value logical op (pingcap#9560)
Browse files Browse the repository at this point in the history
ref pingcap#9146

Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and yibin committed Nov 6, 2024
1 parent 02f2f03 commit 7f686cd
Show file tree
Hide file tree
Showing 9 changed files with 494 additions and 49 deletions.
19 changes: 13 additions & 6 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,8 @@ String DAGExpressionAnalyzer::applyFunction(

String DAGExpressionAnalyzer::buildFilterColumn(
const ExpressionActionsPtr & actions,
const google::protobuf::RepeatedPtrField<tipb::Expr> & conditions)
const google::protobuf::RepeatedPtrField<tipb::Expr> & conditions,
bool null_as_false)
{
String filter_column_name;
if (conditions.size() == 1)
Expand All @@ -962,19 +963,24 @@ String DAGExpressionAnalyzer::buildFilterColumn(
for (const auto & condition : conditions)
arg_names.push_back(getActions(condition, actions, true));
// connect all the conditions by logical and
filter_column_name = applyFunction("and", arg_names, actions, nullptr);
// two_value_and treats null as false inside the `two_value_and` function, so the output column
// will always be UInt8 type, which can save the merge step in FilterDescription
// it should only be used when the output is only used as a filter column
String fun_name = null_as_false ? "two_value_and" : "and";
filter_column_name = applyFunction(fun_name, arg_names, actions, nullptr);
}
return filter_column_name;
}

std::tuple<ExpressionActionsPtr, String, ExpressionActionsPtr> DAGExpressionAnalyzer::buildPushDownFilter(
const google::protobuf::RepeatedPtrField<tipb::Expr> & conditions)
const google::protobuf::RepeatedPtrField<tipb::Expr> & conditions,
bool null_as_false)
{
assert(!conditions.empty());

ExpressionActionsChain chain;
initChain(chain);
String filter_column_name = appendWhere(chain, conditions);
String filter_column_name = appendWhere(chain, conditions, null_as_false);
ExpressionActionsPtr before_where = chain.getLastActions();
chain.addStep();

Expand All @@ -993,11 +999,12 @@ std::tuple<ExpressionActionsPtr, String, ExpressionActionsPtr> DAGExpressionAnal

String DAGExpressionAnalyzer::appendWhere(
ExpressionActionsChain & chain,
const google::protobuf::RepeatedPtrField<tipb::Expr> & conditions)
const google::protobuf::RepeatedPtrField<tipb::Expr> & conditions,
bool null_as_false)
{
auto & last_step = initAndGetLastStep(chain);

String filter_column_name = buildFilterColumn(last_step.actions, conditions);
String filter_column_name = buildFilterColumn(last_step.actions, conditions, null_as_false);

last_step.required_output.push_back(filter_column_name);
return filter_column_name;
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class DAGExpressionAnalyzer : private boost::noncopyable

String appendWhere(
ExpressionActionsChain & chain,
const google::protobuf::RepeatedPtrField<tipb::Expr> & conditions);
const google::protobuf::RepeatedPtrField<tipb::Expr> & conditions,
bool null_as_false = false);

GroupingSets buildExpandGroupingColumns(const tipb::Expand & expand, const ExpressionActionsPtr & actions);

Expand Down Expand Up @@ -144,10 +145,12 @@ class DAGExpressionAnalyzer : private boost::noncopyable

String buildFilterColumn(
const ExpressionActionsPtr & actions,
const google::protobuf::RepeatedPtrField<tipb::Expr> & conditions);
const google::protobuf::RepeatedPtrField<tipb::Expr> & conditions,
bool null_as_false = false);

std::tuple<ExpressionActionsPtr, String, ExpressionActionsPtr> buildPushDownFilter(
const google::protobuf::RepeatedPtrField<tipb::Expr> & conditions);
const google::protobuf::RepeatedPtrField<tipb::Expr> & conditions,
bool null_as_false = false);

void buildAggFuncs(
const tipb::Aggregation & aggregation,
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGExpressionAnalyzerHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ String DAGExpressionAnalyzerHelper::buildSingleParamJsonRelatedFunctions(
function_json_unquote)
{
bool valid_check
= !(isScalarFunctionExpr(input_expr) && input_expr.sig() == tipb::ScalarFuncSig::CastJsonAsString);
= !isScalarFunctionExpr(input_expr) || input_expr.sig() != tipb::ScalarFuncSig::CastJsonAsString;
function_json_unquote->setNeedValidCheck(valid_check);
}
else if (auto * function_cast_json_as_string = dynamic_cast<FunctionCastJsonAsString *>(function_impl);
Expand Down Expand Up @@ -555,6 +555,7 @@ DAGExpressionAnalyzerHelper::FunctionBuilderMap DAGExpressionAnalyzerHelper::fun
{"cast_json_as_string", DAGExpressionAnalyzerHelper::buildSingleParamJsonRelatedFunctions},
{"json_unquote", DAGExpressionAnalyzerHelper::buildSingleParamJsonRelatedFunctions},
{"and", DAGExpressionAnalyzerHelper::buildLogicalFunction},
{"two_value_and", DAGExpressionAnalyzerHelper::buildLogicalFunction},
{"or", DAGExpressionAnalyzerHelper::buildLogicalFunction},
{"xor", DAGExpressionAnalyzerHelper::buildLogicalFunction},
{"not", DAGExpressionAnalyzerHelper::buildLogicalFunction},
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ void executePushedDownFilter(
DAGPipeline & pipeline)
{
auto [before_where, filter_column_name, project_after_where]
= analyzer.buildPushDownFilter(filter_conditions.conditions);
= analyzer.buildPushDownFilter(filter_conditions.conditions, true);

for (auto & stream : pipeline.streams)
{
Expand All @@ -464,7 +464,7 @@ void executePushedDownFilter(
LoggerPtr log)
{
auto [before_where, filter_column_name, project_after_where]
= analyzer.buildPushDownFilter(filter_conditions.conditions);
= analyzer.buildPushDownFilter(filter_conditions.conditions, true);

auto input_header = group_builder.getCurrentHeader();
for (size_t i = 0; i < group_builder.concurrency(); ++i)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Planner/Plans/PhysicalFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ PhysicalPlanNodePtr PhysicalFilter::build(
DAGExpressionAnalyzer analyzer{child->getSchema(), context};
ExpressionActionsPtr before_filter_actions = PhysicalPlanHelper::newActions(child->getSampleBlock());

String filter_column_name = analyzer.buildFilterColumn(before_filter_actions, selection.conditions());
String filter_column_name = analyzer.buildFilterColumn(before_filter_actions, selection.conditions(), true);

auto physical_filter = std::make_shared<PhysicalFilter>(
executor_id,
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Functions/FunctionsLogical.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ namespace DB
void registerFunctionsLogical(FunctionFactory & factory)
{
factory.registerFunction<FunctionAnd>();
factory.registerFunction<FunctionTwoValueAnd>();
factory.registerFunction<FunctionOr>();
factory.registerFunction<FunctionXor>();
factory.registerFunction<FunctionNot>();
Expand Down
Loading

0 comments on commit 7f686cd

Please sign in to comment.