Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++][Compute][Acero] Poor aggregate performance when there is a large number of batches on the build side #45847

Open
uchenily opened this issue Mar 18, 2025 · 21 comments

Comments

@uchenily
Copy link

uchenily commented Mar 18, 2025

Describe the bug, including details regarding any error messages, version, and platform.

I am running a performance test on a plan as described below. This execution plan is fairly straightforward, involving only two source nodes, one hash join node, and one aggregation node.

         aggr
           +
         join
     +-----+------+
  source_0     source_1
  (probe)      (build)

However, I noticed that the performance is far below my expectations. So, I made the following table for further analysis.

In this table, the horizontal axis represents the number of batches generated on the build side, while the vertical axis denotes the number of batches produced on the probe side, with each batch size being 1<<15.

time(in seconds) build 1 build 2 build 4 build 16 build 32 build 64
probe 1 0.03 0.04 0.06 0.03 7.6 269.7
probe 2 0.04 0.04 0.06 7.7 59.1 176.0
probe 4 0.05 0.06 0.06 7.7 56.1 229.9
probe 16 0.11 0.12 0.12 3.2 32.2 145.8
probe 32 0.19 0.19 0.20 3.1 32.2 107.5
probe 64 0.36 0.36 0.36 3.4 44.9 134.9
probe 256 1.33 1.33 1.34 5.5 30.7 197.3
probe 1024 5.2 5.3 5.2 5.3 17.7 50.9

More information:

  1. I run these tests on a Intel x86_64 machine with about 100 cores.

  2. I have noticed that in scenarios where the execution time exceeds 10 seconds, the CPU utilization is very low, and in the most of time, only one CPU core is being used.

  3. I found that the arrow/compute/row/grouper.cc:ConsumeImpl method is quite time-consuming. (map phase)

  4. The process of generating data also consumes a considerable amount of time, but in the tests mentioned above, the time spent on data generation does not account for a significant portion.

  5. The distribution of data is also certain to affect the execution time, but I have not conducted any relevant verification.

  6. In some scenarios, the execution time is longer even with a smaller amount of data, which should be considered unreasonable.

  7. I tested on both version 19.0.1 and the main branch, and no significant differences were observed.(this table is based on v19.0.1)

  8. If we remove the aggregate node, everything becomes OK, so I guess the main problem is the hash aggr node; If we replace hash_count in the test code below with hash_sum or another hash aggregation function, the results were almost unanimous.

Component(s)

C++

@uchenily
Copy link
Author

uchenily commented Mar 18, 2025

#include <arrow/acero/exec_plan.h>
#include <arrow/acero/options.h>
#include <arrow/api.h>
#include <arrow/result.h>
#include <arrow/status.h>
#include <arrow/util/async_generator.h>
#include <iostream>
#include <memory>
#include <vector>

namespace cp = arrow::compute;
namespace ac = arrow::acero;

static constexpr size_t MAX_BATCH_SIZE = 1 << 15;

namespace arrow_ext {
class RandomBatchGenerator {
public:
    std::shared_ptr<arrow::Schema> schema_;
    int64_t                        min_val_;
    int64_t                        max_val_;
    int64_t                        current_;

    RandomBatchGenerator(std::shared_ptr<arrow::Schema> schema, int64_t min_val, int64_t max_val)
        : schema_{schema}
        , min_val_{min_val}
        , max_val_{max_val}
        , current_{min_val} {};

    arrow::Result<std::shared_ptr<arrow::RecordBatch>> Generate(size_t num_rows) {
        num_rows_ = num_rows;
        for (std::shared_ptr<arrow::Field> field : schema_->fields()) {
            ARROW_RETURN_NOT_OK(arrow::VisitTypeInline(*field->type(), this));
        }
        auto temp = std::vector<std::shared_ptr<arrow::Array>>{};
        std::swap(arrays_, temp);
        return arrow::RecordBatch::Make(schema_, num_rows, temp);
    }

    arrow::Status Visit(const arrow::DataType &type) {
        return arrow::Status::NotImplemented("Generating data for", type.ToString());
    }

    arrow::Status Visit(const arrow::Int64Type &) {
        auto builder = arrow::Int64Builder();
        // auto d = std::uniform_int_distribution{min_val_, max_val_};
        ARROW_RETURN_NOT_OK(builder.Reserve(num_rows_));
        for (size_t i = 0; i < num_rows_; ++i) {
            // ARROW_RETURN_NOT_OK(builder.Append(d(gen_)));
            ARROW_RETURN_NOT_OK(builder.Append(current_++));
        }
        if (current_ > max_val_) {
            current_ = min_val_;
        }
        ARROW_ASSIGN_OR_RAISE(auto array, builder.Finish());
        arrays_.push_back(array);
        return arrow::Status::OK();
    }

protected:
    // std::random_device                         rd_{};
    // std::mt19937                               gen_{rd_()};
    std::vector<std::shared_ptr<arrow::Array>> arrays_;
    size_t                                     num_rows_;
};

auto MakeRandomSourceGenerator(std::shared_ptr<arrow::Schema> schema,
                               size_t                         num_batches,
                               int64_t                        min_val,
                               int64_t                        max_val)
    -> arrow::AsyncGenerator<std::optional<cp::ExecBatch>> {
    struct State {
        State(std::shared_ptr<arrow::Schema> schema,
              size_t                         num_batches,
              int64_t                        min_val,
              int64_t                        max_val)
            : schema_{schema}
            , num_batches_{num_batches}
            , generator_{schema_, min_val, max_val} {}

        std::atomic<size_t>            index_{0};
        std::shared_ptr<arrow::Schema> schema_;
        const size_t                   num_batches_;
        RandomBatchGenerator           generator_;
    };

    auto state = std::make_shared<State>(schema, num_batches, min_val, max_val);

    return [shared = state]() -> arrow::Future<std::optional<cp::ExecBatch>> {
        if (shared->index_.load() == shared->num_batches_) {
            return arrow::AsyncGeneratorEnd<std::optional<cp::ExecBatch>>();
        }

        shared->index_.fetch_add(1);
        auto maybe_batch = shared->generator_.Generate(MAX_BATCH_SIZE);
        if (!maybe_batch.ok()) {
            return arrow::AsyncGeneratorEnd<std::optional<cp::ExecBatch>>();
        }

        std::cout << "Generating batch ... [" << shared->index_.load() << "/"
                  << shared->num_batches_ << "]\n";

        return arrow::Future<std::optional<cp::ExecBatch>>::MakeFinished(
            cp::ExecBatch{**maybe_batch});
    };
}
} // namespace arrow_ext

auto HashJoin(size_t num_probe_batches, size_t num_build_batches) -> arrow::Status {
    auto probe_schema = std::make_shared<arrow::Schema>(std::vector{
        std::make_shared<arrow::Field>("orderkey", arrow::int64()),
        std::make_shared<arrow::Field>("custkey_0", arrow::int64()),
    });
    auto build_schema = std::make_shared<arrow::Schema>(std::vector{
        std::make_shared<arrow::Field>("custkey", arrow::int64()),
    });
    auto probe_generator0
        = arrow_ext::MakeRandomSourceGenerator(probe_schema, num_probe_batches, 0, 3000000);
    auto build_generator1
        = arrow_ext::MakeRandomSourceGenerator(build_schema, num_build_batches, 0, 3000000);

    auto            probe_source_options0 = ac::SourceNodeOptions{probe_schema, probe_generator0};
    auto            build_source_options1 = ac::SourceNodeOptions{build_schema, build_generator1};
    ac::Declaration left{"source", std::move(probe_source_options0)};
    ac::Declaration right{"source", std::move(build_source_options1)};

    ac::HashJoinNodeOptions join_opts{ac::JoinType::RIGHT_OUTER,
                                      {"custkey_0"},
                                      {"custkey"},
                                      cp::literal(true)};

    ac::Declaration hashjoin{
        "hashjoin",
        {std::move(left), std::move(right)},
        join_opts
    };

    bool has_aggregate_node = true;
    if (!has_aggregate_node) {
        // return ac::DeclarationToStatus(std::move(hashjoin));
        auto table = ac::DeclarationToTable(std::move(hashjoin));
        std::cout << "table num_rows: " << (*table)->num_rows() << '\n';
        std::cout << "table (sliced): " << (*table)->Slice(0, 10)->ToString() << '\n';
        return arrow::Status::OK();
    } else {

        // hash sum
        std::vector<cp::Aggregate> aggrs;
        // std::shared_ptr<cp::FunctionOptions> options
        //     = std::make_shared<cp::ScalarAggregateOptions>();
        // aggrs.emplace_back("hash_sum", options, "y", "sum");

        std::shared_ptr<cp::FunctionOptions> options
            = std::make_shared<cp::CountOptions>(cp::CountOptions::ONLY_VALID);
        aggrs.emplace_back("hash_count", options, "orderkey", "count");

        auto key_fields = std::vector<arrow::FieldRef>({"custkey"});
        auto aggregate_options = ac::AggregateNodeOptions{/*aggregates=*/aggrs,
                                                          /*keys=*/key_fields};
        auto aggregate
            = ac::Declaration{"aggregate", {std::move(hashjoin)}, std::move(aggregate_options)};

        // return ac::DeclarationToStatus(std::move(aggregate));

        // auto schema = ac::DeclarationToSchema(std::move(aggregate));
        // LOG_DEBUG("schema: {}", (*schema)->ToString());

        std::cout << ac::DeclarationToString(aggregate).ValueOrDie() << '\n';
        auto table = ac::DeclarationToTable(std::move(aggregate));
        std::cout << "table num_rows: " << (*table)->num_rows() << '\n';
        std::cout << "table (sliced): " << (*table)->Slice(0, 10)->ToString() << '\n';
        std::cout << "probe-side rows: " << num_probe_batches * MAX_BATCH_SIZE << '\n';
        std::cout << "build-side rows: " << num_build_batches * MAX_BATCH_SIZE << '\n';
        return arrow::Status::OK();
    }
}

auto main(int argc, char **argv) -> int {
    if (argc != 3) {
        std::cout << "Usage: " << argv[0] << " {num_probe_batches} {num_build_batches}\n";
        return 0;
    }
    size_t num_probe_batches = std::atoi(argv[1]);
    size_t num_build_batches = std::atoi(argv[2]);
    auto   status = HashJoin(num_probe_batches, num_build_batches);
    if (!status.ok()) {
        std::cerr << "Error occurred: " << status.message() << '\n';
        return EXIT_FAILURE;
    }
    return EXIT_SUCCESS;
}

@uchenily
Copy link
Author

At this moment, I am not sure whether this issue is a bug in hash aggregate or if it is inherent to the test code I have written. If you have any thoughts or ideas, please let me know.

@uchenily uchenily changed the title [C++][Acero] Poor aggregate performance when there is a large number of batches on the build side [C++][Compute][Acero] Poor aggregate performance when there is a large number of batches on the build side Mar 19, 2025
@uchenily
Copy link
Author

I used the following command to generate a flamegraph, which showed that the majority of the time was spent on methods related to SwissTable.

perf record -F 99 -g -- ./test_swissjoin 512 64
perf script > out.perf
./stackcollapse-perf.pl < ../out.perf | ./flamegraph.pl > flamegraph.svg

Image

@uchenily
Copy link
Author

As a comparison, I removed the original GrouperFastImpl and used GrouperImpl exclusively, It surprise to find that GrouperImpl performed better in the above scenarios.

# cpp/src/arrow/compute/row/grouper.cc
 Result<std::unique_ptr<Grouper>> Grouper::Make(const std::vector<TypeHolder>& key_types,
                                                ExecContext* ctx) {
-  if (GrouperFastImpl::CanUse(key_types)) {
-    return GrouperFastImpl::Make(key_types, ctx);
-  }
+  // if (GrouperFastImpl::CanUse(key_types)) {
+  //   return GrouperFastImpl::Make(key_types, ctx);
+  // }
   return GrouperImpl::Make(key_types, ctx);
 }

After replacing GrouperFastImpl -> GrouperImpl, I made the following table:

time(in seconds) build 1 build 2 build 4 build 16 build 32 build 64
probe 1 0.05 0.07 0.11 0.41 0.98 2.27
probe 2 0.05 0.07 0.11 0.40 1.0 2.33
probe 4 0.06 0.07 0.11 0.40 1.04 2.38
probe 16 0.12 0.13 0.16 0.42 1.26 2.55
probe 32 0.19 0.20 0.24 0.48 0.96 3.0
probe 64 0.35 0.37 0.42 0.76 1.89 2.33
probe 256 1.26 1.32 1.43 2.40 3.65 5.48
probe 1024 4.9 4.98 5.1 7.12 10.32 17.74

@uchenily
Copy link
Author

I find that the time spent on SwissTable processing during hash aggregate execution is highly dependent on the data distribution. I made some transformations to the ExecBatch output of hashjoin, such as using the CallFunction method or generating random ExecBatches to replace the input, and the execution times varied significantly. Therefore, I believe there might be some issues with the current SwissTable implementation.

@zanmato1984
Copy link
Contributor

Hi @uchenily , thank you for the informative report!

I will take a deeper look at your case later, but now I may share some rough thoughts.

The extreme bad performance may relate to your test case in two aspects:

  1. You are doing a right outer join, for which an extra "scan" phase (scanning the hash table to output the rows unmatched by the left table) is conducted, which I believe contributes the most part of the slowness. This can be proved by both the flame graph (note the long bar of arrow::acero::SwissJoin::ScanTask) and the original time table (e.g. for "build 64" column as the probe side grows, the time gets not longer but shorter).
  2. The groupby key is the same as the build side join key (custkey). In the scan phase, rows are output in the order of the join key (an implementation detail of the Swiss table) and downstreamed to the agg. Then the GrouperFastImpl in the agg sees many rows having the same key values, this could be very Swiss table unfriendly.

(Note this is not saying it is the problem of your test case. I mean we may just discovered a bad outlier of our current implementation and we may want to address it.)

To further discover the potential problem, would you mind to help with the following two things?

  1. By "in the most of time, only one CPU core is being used", could you capture the stacktrace during that "only one core" period? Just to check if we have potential concurrency/locking issues.
  2. Since the existence of the hash join amplifies the bad performance of agg, could you do some experiment with a plan of only source + agg (i.e. no hash join), and attach the time table and flame graph if possible?

Much appreciated!

@uchenily
Copy link
Author

uchenily commented Mar 20, 2025

@zanmato1984 Thank you for your response!

About the first aspect you mentioned above, approximately 93% of the time is spent on arrow::acero::SwissJoin::ScanTask. However, I believe this is primarily due to the execution of the callback function (i.e., output_batch_callback_). If the next node after the hash join is not an aggregation, the total execution time is very short, and there is no performance issue.

In my test code, the batch generator actually produces ordered values (not random values). For the build-side, it is a sequence of [0, 1, ..., N], divided into batches by MAX_BATCH_SIZE (1 << 15)(build-side no duplicate values, if N < 3000000). Additionally, even if I change the probe-side input to batches filled entirely with null values, the result remains the same. In this scenario, I printed the batches output by the hash join and found that the values in the third column are not actually ordered. (I don't know the details of hashjoin, but the output is fine)

ExecBatch generated by the hashjoin node:

ExecBatch
    # Rows: 32768
    0: Array[null,null,null,null,null,null,null,null,null,null,...,null,null,null,null,null,null,null,null,null,null]
    1: Array[null,null,null,null,null,null,null,null,null,null,...,null,null,null,null,null,null,null,null,null,null]
    2: Array[32774,32784,32787,32797,32800,32810,32813,32823,32836,32849,...,262035,262048,262061,262071,262074,262084,262087,262097,262100,262110]

I indeed wanted to create a plan with only source + agg to eliminate the influence of the hash join node. However, since I am not sure how to generate the integer sequence that is "unfriendly" to SwissTable, so I can currently only reproduce the issue by including the hashjoin node.

@uchenily
Copy link
Author

uchenily commented Mar 20, 2025

stacktrace:

stacktrace.txt

Updating my description, the CPU utilization remained at 400% for a long period of time. In the final stage, as the number of tasks decreased, the CPU utilization dropped to 100% before the tasks were completed.

The final stage(cpu 100%):

(gdb)
  Id   Target Id                                          Frame
* 1    Thread 0x716a342adc80 (LWP 49154) "test_swissjoin" 0x0000716a33eabbe2 in ?? () from /usr/lib/libc.so.6
  2    Thread 0x7169f27fc6c0 (LWP 49172) "test_swissjoin" 0x0000716a33eabbe2 in ?? () from /usr/lib/libc.so.6
  3    Thread 0x7169f2ffd6c0 (LWP 49171) "test_swissjoin" 0x0000716a33eabbe2 in ?? () from /usr/lib/libc.so.6
  4    Thread 0x7169f37fe6c0 (LWP 49170) "test_swissjoin" 0x0000716a33eabbe2 in ?? () from /usr/lib/libc.so.6
  5    Thread 0x7169f3fff6c0 (LWP 49169) "test_swissjoin" 0x0000716a33eabbe2 in ?? () from /usr/lib/libc.so.6
  6    Thread 0x716a18ff96c0 (LWP 49168) "test_swissjoin" 0x0000716a33eabbe2 in ?? () from /usr/lib/libc.so.6
  7    Thread 0x716a197fa6c0 (LWP 49167) "test_swissjoin" 0x0000716a33eabbe2 in ?? () from /usr/lib/libc.so.6
  8    Thread 0x716a19ffb6c0 (LWP 49166) "test_swissjoin" 0x0000716a33eabbe2 in ?? () from /usr/lib/libc.so.6
  9    Thread 0x716a1a7fc6c0 (LWP 49165) "test_swissjoin" 0x0000716a33eabbe2 in ?? () from /usr/lib/libc.so.6
  10   Thread 0x716a1affd6c0 (LWP 49164) "test_swissjoin" 0x0000716a33eabbe2 in ?? () from /usr/lib/libc.so.6
  11   Thread 0x716a1b7fe6c0 (LWP 49163) "test_swissjoin" 0x0000716a33eabbe2 in ?? () from /usr/lib/libc.so.6
  12   Thread 0x716a1bfff6c0 (LWP 49162) "test_swissjoin" 0x0000716a33eabbe2 in ?? () from /usr/lib/libc.so.6
  13   Thread 0x716a30e076c0 (LWP 49161) "test_swissjoin" 0x0000716a33eabbe2 in ?? () from /usr/lib/libc.so.6
  14   Thread 0x716a316086c0 (LWP 49160) "test_swissjoin" 0x0000716a33eabbe2 in ?? () from /usr/lib/libc.so.6
  15   Thread 0x716a31e096c0 (LWP 49159) "test_swissjoin" 0x0000716a33eabbe2 in ?? () from /usr/lib/libc.so.6
  16   Thread 0x716a3260a6c0 (LWP 49158) "test_swissjoin" 0x0000716a33eabbe2 in ?? () from /usr/lib/libc.so.6
  17   Thread 0x716a32e0b6c0 (LWP 49157) "test_swissjoin" 0x0000716a33eabbe2 in ?? () from /usr/lib/libc.so.6
  18   Thread 0x716a3360c6c0 (LWP 49156) "test_swissjoin" 0x0000716a33eabbe2 in ?? () from /usr/lib/libc.so.6
  19   Thread 0x716a33e0d6c0 (LWP 49155) "test_swissjoin" 0x00005db04d1e1100 in arrow::Buffer::mutable_data (this=0x716a2c0479d8) at /opt/cmake-install/include/arrow/buffer.h:251
(gdb) thread 19 apply bt
[Switching to thread 19 (Thread 0x716a33e0d6c0 (LWP 49155))]
#0  0x00005db04d1e1100 in arrow::Buffer::mutable_data (this=0x716a2c0479d8) at /opt/cmake-install/include/arrow/buffer.h:251
251       uint8_t* mutable_data() {
(gdb) bt
#0  0x00005db04d1e1100 in arrow::Buffer::mutable_data (this=0x716a2c0479d8) at /opt/cmake-install/include/arrow/buffer.h:251
#1  0x0000716a379f820d in arrow::compute::SwissTable::find_next_stamp_match (this=0x716a2c0479c0, hash=1601044284, in_slot_id=160238, out_slot_id=0x716a33e0a358, out_group_id=0x20004c1a570)
    at /root/work/github-latest-arrow/cpp/src/arrow/compute/key_map_internal.cc:409
#2  0x0000716a379f5b58 in arrow::compute::SwissTable::find (this=0x716a2c0479c0, num_keys=1024, hashes=0x716a2c047d20,
    inout_match_bitvector=0x20004724008 "\377\377\377\337\377\377\377\367\373\377\377\377\375\377\377\377~\377\377\377\377\377\377\377\337\377\377\377\377\377\377\377\377\375\377\377\177\277\377\377\377\337\377\377\377\357\367\377\377\377\373\377\377\377\375\377\377\377\277\377\377\377\337\377\377\377\377\367\377\377\377\373\377\377\377\375\377\377\377\277\377\377\377\337\377\377\377\377\367\377\377\377\373\377\377\377\377\376\377\377\177\377\377\377\377\377\377\377\377\367\377\377\377\373\377\377\377\377\376\377\377\177\377\377\377\377\377\377}", local_slots=0x200047240d8 '\a' <repeats 114 times>, "\006", '\a' <repeats 85 times>..., out_group_ids=0x20004c1a000, temp_stack=0x716a2c0476a8,
    equal_impl=..., callback_ctx=0x0) at /root/work/github-latest-arrow/cpp/src/arrow/compute/key_map_internal.cc:495
#3  0x0000716a37a1e953 in arrow::compute::(anonymous namespace)::GrouperFastImpl::ConsumeImpl (this=0x716a2c047690, batch=...) at /root/work/github-latest-arrow/cpp/src/arrow/compute/row/grouper.cc:808
#4  0x0000716a37a1dfe6 in arrow::compute::(anonymous namespace)::GrouperFastImpl::Consume (this=0x716a2c047690, batch=..., offset=0, length=32768) at /root/work/github-latest-arrow/cpp/src/arrow/compute/row/grouper.cc:736
#5  0x0000716a39985907 in arrow::acero::aggregate::GroupByNode::Consume (this=0x5db060e82af0, batch=...) at /root/work/github-latest-arrow/cpp/src/arrow/acero/groupby_aggregate_node.cc:230
#6  0x0000716a39987e0b in operator() (__closure=0x716a33e0ab70, full_batch=..., segment=...) at /root/work/github-latest-arrow/cpp/src/arrow/acero/groupby_aggregate_node.cc:382
#7  0x0000716a399894fe in arrow::acero::aggregate::HandleSegments<arrow::acero::aggregate::GroupByNode::InputReceived(arrow::acero::ExecNode*, arrow::compute::ExecBatch)::<lambda(const arrow::compute::ExecBatch&, const arrow::compute::Segment&)> >(arrow::compute::RowSegmenter *, const arrow::compute::ExecBatch &, const std::vector<int, std::allocator<int> > &, const struct {...} &) (segmenter=0x5db060e81210, batch=..., ids=std::vector of length 0, capacity 0,
    handle_batch=...) at /root/work/github-latest-arrow/cpp/src/arrow/acero/aggregate_internal.h:139
#8  0x0000716a39988342 in arrow::acero::aggregate::GroupByNode::InputReceived (this=0x5db060e82af0, input=0x5db060e82310, batch=...) at /root/work/github-latest-arrow/cpp/src/arrow/acero/groupby_aggregate_node.cc:388
#9  0x0000716a39a7f7d5 in arrow::acero::HashJoinNode::OutputBatchCallback (this=0x5db060e82310, batch=...) at /root/work/github-latest-arrow/cpp/src/arrow/acero/hash_join_node.cc:1016
#10 0x0000716a39a7e9fb in arrow::acero::HashJoinNode::Init()::{lambda(long, arrow::compute::ExecBatch)#1}::operator()(long, arrow::compute::ExecBatch) const (__closure=0x5db060e833a8, batch=...)
    at /root/work/github-latest-arrow/cpp/src/arrow/acero/hash_join_node.cc:959
#11 0x0000716a39a93371 in std::__invoke_impl<arrow::Status, arrow::acero::HashJoinNode::Init()::{lambda(long, arrow::compute::ExecBatch)#1}&, long, arrow::compute::ExecBatch>(std::__invoke_other, arrow::acero::HashJoinNode::Init()::{lambda(long, arrow::compute::ExecBatch)#1}&, long&&, arrow::compute::ExecBatch&&) (__f=...) at /usr/include/c++/14.2.1/bits/invoke.h:61
#12 0x0000716a39a8ed05 in std::__invoke_r<arrow::Status, arrow::acero::HashJoinNode::Init()::{lambda(long, arrow::compute::ExecBatch)#1}&, long, arrow::compute::ExecBatch>(arrow::acero::HashJoinNode::Init()::{lambda(long, arrow::compute::ExecBatch)#1}&, long&&, arrow::compute::ExecBatch&&) (__fn=...) at /usr/include/c++/14.2.1/bits/invoke.h:116
#13 0x0000716a39a8958f in std::_Function_handler<arrow::Status (long, arrow::compute::ExecBatch), arrow::acero::HashJoinNode::Init()::{lambda(long, arrow::compute::ExecBatch)#1}>::_M_invoke(std::_Any_data const&, long&&, arrow::compute::ExecBatch&&) (__functor=..., __args#0=@0x716a33e0ae28: 4, __args#1=...) at /usr/include/c++/14.2.1/bits/std_function.h:291
#14 0x0000716a3a9bf78d in std::function<arrow::Status(long, arrow::compute::ExecBatch)>::operator() (this=0x5db060e833a8, __args#0=4, __args#1=...) at /usr/include/c++/14.2.1/bits/std_function.h:591
#15 0x0000716a39b0a3e3 in arrow::acero::SwissJoin::ScanTask(unsigned long, long)::{lambda(arrow::compute::ExecBatch)#1}::operator()(arrow::compute::ExecBatch) const (__closure=0x716a33e0b070, batch=...)
    at /root/work/github-latest-arrow/cpp/src/arrow/acero/swiss_join.cc:2781
#16 0x0000716a39b12064 in arrow::acero::JoinResultMaterialize::AppendAndOutput<arrow::acero::JoinResultMaterialize::AppendBuildOnly<arrow::acero::SwissJoin::ScanTask(unsigned long, long)::{lambda(arrow::compute::ExecBatch)#1}>(int, unsigned int const*, unsigned int const*, arrow::acero::SwissJoin::ScanTask(unsigned long, long)::{lambda(arrow::compute::ExecBatch)#1})::{lambda(int, int, int*)#1}, arrow::acero::SwissJoin::ScanTask(unsigned long, long)::{lambda(arrow::compute::ExecBatch)#1}>(int, arrow::acero::JoinResultMaterialize::AppendBuildOnly<arrow::acero::SwissJoin::ScanTask(unsigned long, long)::{lambda(arrow::compute::ExecBatch)#1}>(int, unsigned int const*, unsigned int const*, arrow::acero::SwissJoin::ScanTask(unsigned long, long)::{lambda(arrow::compute::ExecBatch)#1})::{lambda(int, int, int*)#1} const&, arrow::acero::SwissJoin::ScanTask(unsigned long, long)::{lambda(arrow::compute::ExecBatch)#1} const&) (
    this=0x5db060e853c0, num_rows_to_append=1024, append_rows_fn=..., output_batch_fn=...) at /root/work/github-latest-arrow/cpp/src/arrow/acero/swiss_join_internal.h:680
#17 0x0000716a39b0ded1 in arrow::acero::JoinResultMaterialize::AppendBuildOnly<arrow::acero::SwissJoin::ScanTask(unsigned long, long)::{lambda(arrow::compute::ExecBatch)#1}>(int, unsigned int const*, unsigned int const*, arrow::acero::SwissJoin::ScanTask(unsigned long, long)::{lambda(arrow::compute::ExecBatch)#1}) (this=0x5db060e853c0, num_rows_to_append=1024, key_ids=0x20000171058, payload_ids=0x20000170008, output_batch_fn=...)
    at /root/work/github-latest-arrow/cpp/src/arrow/acero/swiss_join_internal.h:712
#18 0x0000716a39b0a8f1 in arrow::acero::SwissJoin::ScanTask (this=0x5db060e83300, thread_id=4, task_id=0) at /root/work/github-latest-arrow/cpp/src/arrow/acero/swiss_join.cc:2782
#19 0x0000716a39b0841e in arrow::acero::SwissJoin::InitTaskGroups()::{lambda(unsigned long, long)#3}::operator()(unsigned long, long) const (__closure=0x5db060e889f0, thread_index=4, task_id=0)
    at /root/work/github-latest-arrow/cpp/src/arrow/acero/swiss_join.cc:2514
#20 0x0000716a39b163b9 in std::__invoke_impl<arrow::Status, arrow::acero::SwissJoin::InitTaskGroups()::{lambda(unsigned long, long)#3}&, unsigned long, long>(std::__invoke_other, arrow::acero::SwissJoin::InitTaskGroups()::{lambda(unsigned long, long)#3}&, unsigned long&&, long&&) (__f=...) at /usr/include/c++/14.2.1/bits/invoke.h:61
#21 0x0000716a39b14795 in std::__invoke_r<arrow::Status, arrow::acero::SwissJoin::InitTaskGroups()::{lambda(unsigned long, long)#3}&, unsigned long, long>(arrow::acero::SwissJoin::InitTaskGroups()::{lambda(unsigned long, long)#3}&, unsigned long&&, long&&) (__fn=...) at /usr/include/c++/14.2.1/bits/invoke.h:116
#22 0x0000716a39b1193e in std::_Function_handler<arrow::Status (unsigned long, long), arrow::acero::SwissJoin::InitTaskGroups()::{lambda(unsigned long, long)#3}>::_M_invoke(std::_Any_data const&, unsigned long&&, long&&) (
    __functor=..., __args#0=@0x716a33e0b2d8: 4, __args#1=@0x716a33e0b2d0: 0) at /usr/include/c++/14.2.1/bits/std_function.h:291
#23 0x0000716a39b1d58d in std::function<arrow::Status(unsigned long, long)>::operator() (this=0x5db060e889f0, __args#0=4, __args#1=0) at /usr/include/c++/14.2.1/bits/std_function.h:591
#24 0x0000716a39b1aec5 in arrow::acero::TaskSchedulerImpl::ExecuteTask (this=0x5db060e7fb60, thread_id=4, group_id=4, task_id=0, task_group_finished=0x716a33e0b3b7)

@zanmato1984
Copy link
Contributor

Thank you @uchenily for the quick feedback!

About the first aspect you mentioned above, approximately 93% of the time is spent on arrow::acero::SwissJoin::ScanTask. However, I believe this is primarily due to the execution of the callback function (i.e., output_batch_callback_). If the next node after the hash join is not an aggregation, the total execution time is very short, and there is no performance issue.

Sorry, I didn't mean the aspect of scan solely contributed the execution time. I was suspecting the particular order (well, not exactly order, detailed later) of the scan task output made the agg have to go to suboptimal code path thus causing the bad perf. As a comparison, the matching rows (output via arrow::acero::JoinProbeProcessor::OnNextBatch) don't have this particular order so they are more Swiss table friendly.

In this scenario, I printed the batches output by the hash join and found that the values in the third column are not actually ordered.

By "order" I didn't mean strict order, sorry again. I meant rows of the same key value will be output together within the same batch by the scan task. It is these kind of rows that are Swiss table unfriendly. But after a quick math of your data distribution, the build table doesn't seem to have duplicated keys ((1<<15) * 64 == 2097152 < max_val == 3000000). So this may not be the case.

However, since I am unsure how to generate the integer sequence that is unfriendly to SwissTable, I can currently only reproduce the issue by including the hashjoin node.

So the performance of the plan with only source + agg is OK? Would you mind to share the numbers?

the CPU utilization remained at 400% for a long period of time. In the final stage, as the number of tasks decreased, the CPU utilization dropped to 100% before the tasks were completed.

This matches what the code does so we can eliminate the possibility of unknown concurrency/locking issues for now.

I will definitely try the test case myself and look deeper (when my time allows). 269.7s for 2m rows joining 32k rows is mad.

@uchenily
Copy link
Author

uchenily commented Mar 20, 2025

Only source + agg:

batches 32 64 128 256 512 1024
source+agg 0.2 0.38 0.71 1.38 2.67 5.26

Only source + hashjoin:

time(in seconds) build 32 build 64
probe 256 1.27 1.29
probe 512 2.48 2.5
probe 1024 4.9 4.97

@uchenily
Copy link
Author

uchenily commented Mar 20, 2025

FYI, when I changed kNumRowsPerScanTask from origin value 512 * 1024 to 1024, I achieved a significant performance improvement, with the probe * build (1024*64) taking only 5.3 seconds. (take 50.9s before changing)

However, for larger-scale data(8192 * 512), the performance is still not entirely satisfactory.

@zanmato1984
Copy link
Contributor

I suppose that's because less rows per task enables more parallelism in the scan phase. Have you tried (1 * 64)?

@uchenily
Copy link
Author

uchenily commented Mar 20, 2025

I suppose that's because less rows per task enables more parallelism in the scan phase. Have you tried (1 * 64)?

Please refer to this table. (kNumRowsPerScanTask = 1024)

time(in seconds) build 64
probe 1 0.5
probe 4 2.49
probe 16 1.0
probe 64 0.9
probe 256 1.6
probe 1024 5.5
probe 4096 20.5

For (4096 * 64), from the output of top command, I observed that for most of the time, only 1-2 cores were running, most of CPU resources have not been utilized.

@zanmato1984
Copy link
Contributor

Thank you for the new numbers. Just a bit surprised of (1 * 64) 0.5s being so fast against the original number 269.7s.

@uchenily
Copy link
Author

I find that the slowness is indeed due to the scan phase(ScanTask).
I tested two extreme scenarios:

  1. if the probe and build keys can be completely matched, the right outer join time is significantly reduced(num_output_rows == 0).
  2. However, if none of the probe and build keys match, the execution time becomes very long.

@zanmato1984
Copy link
Contributor

Yes, this matches what I've been suspecting.

BTW, what value of kNumRowsPerScanTask were you using for this recent experiment?

@uchenily
Copy link
Author

BTW, what value of kNumRowsPerScanTask were you using for this recent experiment?

1024 or 4 * 1024, there isn't much difference between them

@zanmato1984
Copy link
Contributor

OK. That means even when the parallelism is fully utilized, the scan phase is still slow. There might be really anti-patterns against Swiss table, just not the one I was originally thinking of:

I meant rows of the same key value will be output together within the same batch by the scan task. It is these kind of rows that are Swiss table unfriendly.

@uchenily
Copy link
Author

If a two-phase output is adopted, significant performance improvements can be achieved in scenarios where the build-side involves large-scale data. However, I currently cannot provide a reasonable explanation for this, @zanmato1984 do you have any ideas?

       if (num_output_rows > 0) {
         // Materialize (and output whenever buffers get full) hash table
         // values according to the generated list of ids.
         //
+        RETURN_NOT_OK(local_states_[thread_id].materialize.Flush([&](ExecBatch batch) {
+          return output_batch_callback_(thread_id, std::move(batch));
+        }));
         Status status = local_states_[thread_id].materialize.AppendBuildOnly(
             num_output_rows, key_ids_buf.mutable_data(), payload_ids_buf.mutable_data(),
             [&](ExecBatch batch) {
               return output_batch_callback_(static_cast<int64_t>(thread_id),
                                             std::move(batch));
             });

@zanmato1984
Copy link
Contributor

I can't think of any. I'll need to try myself. But please wait for a while until I've get my time for this. Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants