Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++][Acero] Data race in aggregate node #45788

Closed
zanmato1984 opened this issue Mar 14, 2025 · 1 comment
Closed

[C++][Acero] Data race in aggregate node #45788

zanmato1984 opened this issue Mar 14, 2025 · 1 comment
Assignees
Milestone

Comments

@zanmato1984
Copy link
Contributor

Describe the bug, including details regarding any error messages, version, and platform.

This is found by accident when I was debugging some other issue, when running with TSAN, the following warning is reported:

Click to expand
$92│ Started PID#47900 - '/Users/zanmato/dev/arrow/cpp/out/build/ninja-debug-tsan/debug/arrow-acero-hash-join-node-test'
$92│ 
$92│ Running main() from /Users/zanmato/dev/arrow/cpp/out/build/ninja-debug-tsan/_deps/googletest-src/googletest/src/gtest_main.cc
$92│ Note: Google Test filter = HashJoin.DISABLED_BuildSideOver4GBFixedLength
$92│ [==========] Running 1 test from 1 test suite.
$92│ [----------] Global test environment set-up.
$92│ [----------] 1 test from HashJoin
$92│ [ RUN      ] HashJoin.DISABLED_BuildSideOver4GBFixedLength @ ./src/arrow/acero/hash_join_node_test.cc:3282
$92│ ⬇ std::cerr:
$92│ ==================
$92│ WARNING: ThreadSanitizer: data race (pid=47900)
$92│   Write of size 8 at 0x00010a0050a8 by thread T4:
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #0 std::__1::vector<arrow::Datum, std::__1::allocator<arrow::Datum>>::__base_destruct_at_end[abi:ue170006](arrow::Datum*) vector:945 (libarrow_acero.1900.0.0.dylib:arm64+0x4288)
$92│     #1 std::__1::vector<arrow::Datum, std::__1::allocator<arrow::Datum>>::__clear[abi:ue170006]() vector:938 (libarrow_acero.1900.0.0.dylib:arm64+0x3fac)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #2 std::__1::vector<arrow::Datum, std::__1::allocator<arrow::Datum>>::clear[abi:ue170006]() vector:723 (libarrow_acero.1900.0.0.dylib:arm64+0x16d7c)
$92│     #3 arrow::acero::aggregate::ExtractSegmenterValues(std::__1::vector<arrow::Datum, std::__1::allocator<arrow::Datum>>*, arrow::compute::ExecBatch const&, std::__1::vector<int, std::__1::allocator<^_^int>> const&) aggregate_internal.cc:186 (libarrow_acero.1900.0.0.dylib:arm64+0x82464)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #4 arrow::acero::aggregate::ScalarAggregateNode::InputReceived(arrow::acero::ExecNode*, arrow::compute::ExecBatch)::$_0::operator()(arrow::compute::ExecBatch const&, arrow::compute::Segment const&) const scalar_aggregate_node.cc:243 (libarrow_acero.1900.0.0.dylib:arm64+0x553e4)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #5 arrow::Status arrow::acero::aggregate::HandleSegments<arrow::acero::aggregate::ScalarAggregateNode::InputReceived(arrow::acero::ExecNode*, arrow::compute::ExecBatch)::$_0>(arrow::compute::RowSegmenter*, arrow::compute::ExecBatch const&, std::__1::vector<int, std::__1::allocator<int>> const&, arrow::acero::aggregate::ScalarAggregateNode::InputReceived(arrow::acero::ExecNode*, arrow::compute::ExecBatch)::$_0 const&) aggregate_internal.h:139 (libarrow_acero.1900.0.0.dylib:arm64+0x217f8)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #6 arrow::acero::aggregate::ScalarAggregateNode::InputReceived(arrow::acero::ExecNode*, arrow::compute::ExecBatch) scalar_aggregate_node.cc:252 (libarrow_acero.1900.0.0.dylib:arm64+0x21094)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #7 arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::'lambda'()::operator()() const source_node.cc:158 (libarrow_acero.1900.0.0.dylib:arm64+0x37c9a8)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #8 decltype(std::declval<arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::'lambda'()&>()()) std::__1::__invoke[abi:ue170006]<arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::'lambda'()&>(arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::'lambda'()&) invoke.h:340 (libarrow_acero.1900.0.0.dylib:arm64+0x37c5d8)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #9 arrow::Status std::__1::__invoke_void_return_wrapper<arrow::Status, false>::__call[abi:ue170006]<arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::'lambda'()&>(arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::'lambda'()&) invoke.h:407 (libarrow_acero.1900.0.0.dylib:arm64+0x37c52c)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #10 std::__1::__function::__alloc_func<arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::'lambda'(), std::__1::allocator<arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::'lambda'()>, arrow::Status ()>::operator()[abi:ue170006]() function.h:193 (libarrow_acero.1900.0.0.dylib:arm64+0x37c4c8)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #11 std::__1::__function::__func<arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::'lambda'(), std::__1::allocator<arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::'lambda'()>, arrow::Status ()>::operator()() function.h:364 (libarrow_acero.1900.0.0.dylib:arm64+0x37a280)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #12 std::__1::__function::__value_func<arrow::Status ()>::operator()[abi:ue170006]() const function.h:518 (libarrow_acero.1900.0.0.dylib:arm64+0x17d38)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #13 std::__1::function<arrow::Status ()>::operator()() const function.h:1169 (libarrow_acero.1900.0.0.dylib:arm64+0xe0dc)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #14 std::__1::enable_if<!std::is_void<arrow::Status>::value && !is_future<arrow::Status>::value && (!arrow::Future<arrow::internal::Empty>::is_empty || std::is_same<arrow::Status, arrow::Status>::value), void>::type arrow::detail::ContinueFuture::operator()<std::__1::function<arrow::Status ()>&, arrow::Status, arrow::Future<arrow::internal::Empty>>(arrow::Future<arrow::internal::Empty>, std::__1::function<arrow::Status ()>&) const future.h:150 (libarrow_acero.1900.0.0.dylib:arm64+0x32cedc)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #15 decltype(std::declval<arrow::detail::ContinueFuture&>()(std::declval<arrow::Future<arrow::internal::Empty>&>(), std::declval<std::__1::function<arrow::Status ()>&>())) std::__1::__invoke[abi:ue170006]<arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, std::__1::function<arrow::Status ()>&>(arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, std::__1::function<arrow::Status ()>&) invoke.h:340 (libarrow_acero.1900.0.0.dylib:arm64+0x32cd84)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #16 std::__1::__bind_return<arrow::detail::ContinueFuture, std::__1::tuple<arrow::Future<arrow::internal::Empty>, std::__1::function<arrow::Status ()>>, std::__1::tuple<>, __is_valid_bind_return<arrow::detail::ContinueFuture, std::__1::tuple<arrow::Future<arrow::internal::Empty>, std::__1::function<arrow::Status ()>>, std::__1::tuple<>>::value>::type std::__1::__apply_functor[abi:ue170006]<arrow::detail::ContinueFuture, std::__1::tuple<arrow::Future<arrow::internal::Empty>, std::__1::function<arrow::Status ()>>, 0ul, 1ul, std::__1::tuple<>>(arrow::detail::ContinueFuture&, std::__1::tuple<arrow::Future<arrow::internal::Empty>, std::__1::function<arrow::Status ()>>&, std::__1::__tuple_indices<0ul, 1ul>, std::__1::tuple<>&&) bind.h:260 (libarrow_acero.1900.0.0.dylib:arm64+0x32ccf8)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #17 std::__1::__bind_return<arrow::detail::ContinueFuture, std::__1::tuple<arrow::Future<arrow::internal::Empty>, std::__1::function<arrow::Status ()>>, std::__1::tuple<>, __is_valid_bind_return<arrow::detail::ContinueFuture, std::__1::tuple<arrow::Future<arrow::internal::Empty>, std::__1::function<arrow::Status ()>>, std::__1::tuple<>>::value>::type std::__1::__bind<arrow::detail::ContinueFuture, arrow::Future<arrow::internal::Empty>&, std::__1::function<arrow::Status ()>>::operator()[abi:ue170006]<>() bind.h:292 (libarrow_acero.1900.0.0.dylib:arm64+0x32cc58)
$92│     #18 arrow::internal::FnOnce<void ()>::FnImpl<std::__1::__bind<arrow::detail::ContinueFuture, arrow::Future<arrow::internal::Empty>&, std::__1::function<arrow::Status ()>>>::invoke() functional.h:152 (libarrow_acero.1900.0.0.dylib:arm64+0x32cb80)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #19 arrow::internal::FnOnce<void ()>::operator()() && functional.h:140 (libarrow.1900.0.0.dylib:arm64+0x35c1c70)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #20 arrow::internal::WorkerLoop(std::__1::shared_ptr<arrow::internal::ThreadPool::State>, std::__1::__list_iterator<std::__1::thread, void*>) thread_pool.cc:478 (libarrow.1900.0.0.dylib:arm64+0x35da59c)
$92│     #21 arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::$_6::operator()() const thread_pool.cc:643 (libarrow.1900.0.0.dylib:arm64+0x35da25c)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #22 decltype(std::declval<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::$_6>()()) std::__1::__invoke[abi:ue170006]<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::$_6>(arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::$_6&&) invoke.h:340 (libarrow.1900.0.0.dylib:arm64+0x35da150)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #23 void std::__1::__thread_execute[abi:ue170006]<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::$_6>(std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::$_6>&, std::__1::__tuple_indices<>) thread.h:227 (libarrow.1900.0.0.dylib:arm64+0x35da0fc)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #24 void* std::__1::__thread_proxy[abi:ue170006]<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::$_6>>(void*) thread.h:238 (libarrow.1900.0.0.dylib:arm64+0x35d9a98)
$92│ 
$92│   Previous write of size 8 at 0x00010a0050a8 by thread T8:
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #0 std::__1::vector<arrow::Datum, std::__1::allocator<arrow::Datum>>::__base_destruct_at_end[abi:ue170006](arrow::Datum*) vector:945 (libarrow_acero.1900.0.0.dylib:arm64+0x4288)
$92│     #1 std::__1::vector<arrow::Datum, std::__1::allocator<arrow::Datum>>::__clear[abi:ue170006]() vector:938 (libarrow_acero.1900.0.0.dylib:arm64+0x3fac)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #2 std::__1::vector<arrow::Datum, std::__1::allocator<arrow::Datum>>::clear[abi:ue170006]() vector:723 (libarrow_acero.1900.0.0.dylib:arm64+0x16d7c)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #3 arrow::acero::aggregate::ExtractSegmenterValues(std::__1::vector<arrow::Datum, std::__1::allocator<arrow::Datum>>*, arrow::compute::ExecBatch const&, std::__1::vector<int, std::__1::allocator<int>> const&) aggregate_internal.cc:186 (libarrow_acero.1900.0.0.dylib:arm64+0x82464)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #4 arrow::acero::aggregate::ScalarAggregateNode::InputReceived(arrow::acero::ExecNode*, arrow::compute::ExecBatch)::$_0::operator()(arrow::compute::ExecBatch const&, arrow::compute::Segment const&) const scalar_aggregate_node.cc:243 (libarrow_acero.1900.0.0.dylib:arm64+0x553e4)
$92│     #5 arrow::Status arrow::acero::aggregate::HandleSegments<arrow::acero::aggregate::ScalarAggregateNode::InputReceived(arrow::acero::ExecNode*, arrow::compute::ExecBatch)::$_0>(arrow::compute::RowSegmenter*, arrow::compute::ExecBatch const&, std::__1::vector<int, std::__1::allocator<int>> const&, arrow::acero::aggregate::ScalarAggregateNode::InputReceived(arrow::acero::ExecNode*, arrow::compute::ExecBatch)::$_0 const&) aggregate_internal.h:139 (libarrow_acero.1900.0.0.dylib:arm64+0x217f8)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #6 arrow::acero::aggregate::ScalarAggregateNode::InputReceived(arrow::acero::ExecNode*, arrow::compute::ExecBatch) scalar_aggregate_node.cc:252 (libarrow_acero.1900.0.0.dylib:arm64+0x21094)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #7 arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::'lambda'()::operator()() const source_node.cc:158 (libarrow_acero.1900.0.0.dylib:arm64+0x37c9a8)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #8 decltype(std::declval<arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::'lambda'()&>()()) std::__1::__invoke[abi:ue170006]<arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::'lambda'()&>(arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::'lambda'()&) invoke.h:340 (libarrow_acero.1900.0.0.dylib:arm64+0x37c5d8)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #9 arrow::Status std::__1::__invoke_void_return_wrapper<arrow::Status, false>::__call[abi:ue170006]<arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::'lambda'()&>(arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::'lambda'()&) invoke.h:407 (libarrow_acero.1900.0.0.dylib:arm64+0x37c52c)
$92│     #10 std::__1::__function::__alloc_func<arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::'lambda'(), std::__1::allocator<arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::'lambda'()>, arrow::Status ()>::operator()[abi:ue170006]() function.h:193 (libarrow_acero.1900.0.0.dylib:arm64+0x37c4c8)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #11 std::__1::__function::__func<arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::'lambda'(), std::__1::allocator<arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::'lambda'()>, arrow::Status ()>::operator()() function.h:364 (libarrow_acero.1900.0.0.dylib:arm64+0x37a280)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #12 std::__1::__function::__value_func<arrow::Status ()>::operator()[abi:ue170006]() const function.h:518 (libarrow_acero.1900.0.0.dylib:arm64+0x17d38)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #13 std::__1::function<arrow::Status ()>::operator()() const function.h:1169 (libarrow_acero.1900.0.0.dylib:arm64+0xe0dc)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #14 std::__1::enable_if<!std::is_void<arrow::Status>::value && !is_future<arrow::Status>::value && (!arrow::Future<arrow::internal::Empty>::is_empty || std::is_same<arrow::Status, arrow::Status>::value), void>::type arrow::detail::ContinueFuture::operator()<std::__1::function<arrow::Status ()>&, arrow::Status, arrow::Future<arrow::internal::Empty>>(arrow::Future<arrow::internal::Empty>, std::__1::function<arrow::Status ()>&) const future.h:150 (libarrow_acero.1900.0.0.dylib:arm64+0x32cedc)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #15 decltype(std::declval<arrow::detail::ContinueFuture&>()(std::declval<arrow::Future<arrow::internal::Empty>&>(), std::declval<std::__1::function<arrow::Status ()>&>())) std::__1::__invoke[abi:ue170006]<arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, std::__1::function<arrow::Status ()>&>(arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, std::__1::function<arrow::Status ()>&) invoke.h:340 (libarrow_acero.1900.0.0.dylib:arm64+0x32cd84)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #16 std::__1::__bind_return<arrow::detail::ContinueFuture, std::__1::tuple<arrow::Future<arrow::internal::Empty>, std::__1::function<arrow::Status ()>>, std::__1::tuple<>, __is_valid_bind_return<arrow::detail::ContinueFuture, std::__1::tuple<arrow::Future<arrow::internal::Empty>, std::__1::function<arrow::Status ()>>, std::__1::tuple<>>::value>::type std::__1::__apply_functor[abi:ue170006]<arrow::detail::ContinueFuture, std::__1::tuple<arrow::Future<arrow::internal::Empty>, std::__1::function<arrow::Status ()>>, 0ul, 1ul, std::__1::tuple<>>(arrow::detail::ContinueFuture&, std::__1::tuple<arrow::Future<arrow::internal::Empty>, std::__1::function<arrow::Status ()>>&, std::__1::__tuple_indices<0ul, 1ul>, std::__1::tuple<>&&) bind.h:260 (libarrow_acero.1900.0.0.dylib:arm64+0x32ccf8)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #17 std::__1::__bind_return<arrow::detail::ContinueFuture, std::__1::tuple<arrow::Future<arrow::internal::Empty>, std::__1::function<arrow::Status ()>>, std::__1::tuple<>, __is_valid_bind_return<arrow::detail::ContinueFuture, std::__1::tuple<arrow::Future<arrow::internal::Empty>, std::__1::function<arrow::Status ()>>, std::__1::tuple<>>::value>::type std::__1::__bind<arrow::detail::ContinueFuture, arrow::Future<arrow::internal::Empty>&, std::__1::function<arrow::Status ()>>::operator()[abi:ue170006]<>() bind.h:292 (libarrow_acero.1900.0.0.dylib:arm64+0x32cc58)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #18 arrow::internal::FnOnce<void ()>::FnImpl<std::__1::__bind<arrow::detail::ContinueFuture, arrow::Future<arrow::internal::Empty>&, std::__1::function<arrow::Status ()>>>::invoke() functional.h:152 (libarrow_acero.1900.0.0.dylib:arm64+0x32cb80)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #19 arrow::internal::FnOnce<void ()>::operator()() && functional.h:140 (libarrow.1900.0.0.dylib:arm64+0x35c1c70)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #20 arrow::internal::WorkerLoop(std::__1::shared_ptr<arrow::internal::ThreadPool::State>, std::__1::__list_iterator<std::__1::thread, void*>) thread_pool.cc:478 (libarrow.1900.0.0.dylib:arm64+0x35da59c)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #21 arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::$_6::operator()() const thread_pool.cc:643 (libarrow.1900.0.0.dylib:arm64+0x35da25c)
$92│     #22 decltype(std::declval<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::$_6>()()) std::__1::__invoke[abi:ue170006]<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::$_6>(arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::$_6&&) invoke.h:340 (libarrow.1900.0.0.dylib:arm64+0x35da150)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #23 void std::__1::__thread_execute[abi:ue170006]<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::$_6>(std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::$_6>&, std::__1::__tuple_indices<>) thread.h:227 (libarrow.1900.0.0.dylib:arm64+0x35da0fc)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #24 void* std::__1::__thread_proxy[abi:ue170006]<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::$_6>>(void*) thread.h:238 (libarrow.1900.0.0.dylib:arm64+0x35d9a98)
$92│ 
$92│   Location is heap block of size 320 at 0x00010a005000 allocated by main thread:
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #0 operator new(unsigned long) <null>:116051588 (libclang_rt.tsan_osx_dynamic.dylib:arm64e+0x84420)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #1 arrow::acero::aggregate::ScalarAggregateNode* arrow::acero::ExecPlan::EmplaceNode<arrow::acero::aggregate::ScalarAggregateNode, arrow::acero::ExecPlan*&, std::__1::vector<arrow::acero::ExecNode*, std::__1::allocator<arrow::acero::ExecNode*>>, std::__1::shared_ptr<arrow::Schema>, std::__1::unique_ptr<arrow::compute::RowSegmenter, std::__1::default_delete<arrow::compute::RowSegmenter>>, std::__1::vector<int, std::__1::allocator<int>>, std::__1::vector<std::__1::vector<int, std::__1::allocator<int>>, std::__1::allocator<std::__1::vector<int, std::__1::allocator<int>>>>, std::__1::vector<arrow::compute::Aggregate, std::__1::allocator<arrow::compute::Aggregate>>, std::__1::vector<arrow::compute::ScalarAggregateKernel const*, std::__1::allocator<arrow::compute::ScalarAggregateKernel const*>>, std::__1::vector<std::__1::vector<arrow::TypeHolder, std::__1::allocator<arrow::TypeHolder>>, std::__1::allocator<std::__1::vector<arrow::TypeHolder, std::__1::allocator<arrow::TypeHolder>>>>, std::__1::vector<std::__1::vector<std::__1::unique_ptr<arrow::compute::KernelState, std::__1::default_delete<arrow::compute::KernelState>>, std::__1::allocator<std::__1::unique_ptr<arrow::compute::KernelState, std::__1::default_delete<arrow::compute::KernelState>>>>, std::__1::allocator<std::__1::vector<std::__1::unique_ptr<arrow::compute::KernelState, std::__1::default_delete<arrow::compute::KernelState>>, std::__1::allocator<std::__1::unique_ptr<arrow::compute::KernelState, std::__1::default_delete<arrow::compute::KernelState>>>>>>>(arrow::acero::ExecPlan*&, std::__1::vector<arrow::acero::ExecNode*, std::__1::allocator<arrow::acero::ExecNode*>>&&, std::__1::shared_ptr<arrow::Schema>&&, std::__1::unique_ptr<arrow::compute::RowSegmenter, std::__1::default_delete<arrow::compute::RowSegmenter>>&&, std::__1::vector<int, std::__1::allocator<int>>&&, std::__1::vector<std::__1::vector<int, std::__1::allocator<int>>, std::__1::allocator<std::__1::vector<int, std::__1::allocator<int>>>>&&, std::__1::vector<arrow::compute::Aggregate, std::__1::allocator<arrow::compute::Aggregate>>&&, std::__1::vector<arrow::compute::ScalarAggregateKernel const*, std::__1::allocator<arrow::compute::ScalarAggregateKernel const*>>&&, std::__1::vector<std::__1::vector<arrow::TypeHolder, std::__1::allocator<arrow::TypeHolder>>, std::__1::allocator<std::__1::vector<arrow::TypeHolder, std::__1::allocator<arrow::TypeHolder>>>>&&, std::__1::vector<std::__1::vector<std::__1::unique_ptr<arrow::compute::KernelState, std::__1::default_delete<arrow::compute::KernelState>>, std::__1::allocator<std::__1::unique_ptr<arrow::compute::KernelState, std::__1::default_delete<arrow::compute::KernelState>>>>, std::__1::allocator<std::__1::vector<std::__1::unique_ptr<arrow::compute::KernelState, std::__1::default_delete<arrow::compute::KernelState>>, std::__1::allocator<std::__1::unique_ptr<arrow::compute::KernelState, std::__1::default_delete<arrow::compute::KernelState>>>>>>&&) exec_plan.h:88 (libarrow_acero.1900.0.0.dylib:arm64+0x1ff28)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #2 arrow::acero::aggregate::ScalarAggregateNode::Make(arrow::acero::ExecPlan*, std::__1::vector<arrow::acero::ExecNode*, std::__1::allocator<arrow::acero::ExecNode*>>, arrow::acero::ExecNodeOptions const&) scalar_aggregate_node.cc:198 (libarrow_acero.1900.0.0.dylib:arm64+0x1f868)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #3 arrow::acero::internal::RegisterAggregateNode(arrow::acero::ExecFactoryRegistry*)::$_0::operator()(arrow::acero::ExecPlan*, std::__1::vector<arrow::acero::ExecNode*, std::__1::allocator<arrow::acero::ExecNode*>>, arrow::acero::ExecNodeOptions const&) const aggregate_internal.cc:253 (libarrow_acero.1900.0.0.dylib:arm64+0x8c6cc)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #4 decltype(std::declval<arrow::acero::internal::RegisterAggregateNode(arrow::acero::ExecFactoryRegistry*)::$_0&>()(std::declval<arrow::acero::ExecPlan*>(), std::declval<std::__1::vector<arrow::acero::ExecNode*, std::__1::allocator<arrow::acero::ExecNode*>>>(), std::declval<arrow::acero::ExecNodeOptions const&>())) std::__1::__invoke[abi:ue170006]<arrow::acero::internal::RegisterAggregateNode(arrow::acero::ExecFactoryRegistry*)::$_0&, arrow::acero::ExecPlan*, std::__1::vector<arrow::acero::ExecNode*, std::__1::allocator<arrow::acero::ExecNode*>>, arrow::acero::ExecNodeOptions const&>(arrow::acero::internal::RegisterAggregateNode(arrow::acero::ExecFactoryRegistry*)::$_0&, arrow::acero::ExecPlan*&&, std::__1::vector<arrow::acero::ExecNode*, std::__1::allocator<arrow::acero::ExecNode*>>&&, arrow::acero::ExecNodeOptions const&) invoke.h:340 (libarrow_acero.1900.0.0.dylib:arm64+0x8c5d4)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #5 arrow::Result<arrow::acero::ExecNode*> std::__1::__invoke_void_return_wrapper<arrow::Result<arrow::acero::ExecNode*>, false>::__call[abi:ue170006]<arrow::acero::internal::RegisterAggregateNode(arrow::acero::ExecFactoryRegistry*)::$_0&, arrow::acero::ExecPlan*, std::__1::vector<arrow::acero::ExecNode*, std::__1::allocator<arrow::acero::ExecNode*>>, arrow::acero::ExecNodeOptions const&>(arrow::acero::internal::RegisterAggregateNode(arrow::acero::ExecFactoryRegistry*)::$_0&, arrow::acero::ExecPlan*&&, std::__1::vector<arrow::acero::ExecNode*, std::__1::allocator<arrow::acero::ExecNode*>>&&, arrow::acero::ExecNodeOptions const&) invoke.h:407 (libarrow_acero.1900.0.0.dylib:arm64+0x8c4c8)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #6 std::__1::__function::__alloc_func<arrow::acero::internal::RegisterAggregateNode(arrow::acero::ExecFactoryRegistry*)::$_0, std::__1::allocator<arrow::acero::internal::RegisterAggregateNode(arrow::acero::ExecFactoryRegistry*)::$_0>, arrow::Result<arrow::acero::ExecNode*> (arrow::acero::ExecPlan*, std::__1::vector<arrow::acero::ExecNode*, std::__1::allocator<arrow::acero::ExecNode*>>, arrow::acero::ExecNodeOptions const&)>::operator()[abi:ue170006](arrow::acero::ExecPlan*&&, std::__1::vector<arrow::acero::ExecNode*, std::__1::allocator<arrow::acero::ExecNode*>>&&, arrow::acero::ExecNodeOptions const&) function.h:193 (libarrow_acero.1900.0.0.dylib:arm64+0x8c434)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #7 std::__1::__function::__func<arrow::acero::internal::RegisterAggregateNode(arrow::acero::ExecFactoryRegistry*)::$_0, std::__1::allocator<arrow::acero::internal::RegisterAggregateNode(arrow::acero::ExecFactoryRegistry*)::$_0>, arrow::Result<arrow::acero::ExecNode*> (arrow::acero::ExecPlan*, std::__1::vector<arrow::acero::ExecNode*, std::__1::allocator<arrow::acero::ExecNode*>>, arrow::acero::ExecNodeOptions const&)>::operator()(arrow::acero::ExecPlan*&&, std::__1::vector<arrow::acero::ExecNode*, std::__1::allocator<arrow::acero::ExecNode*>>&&, arrow::acero::ExecNodeOptions const&) function.h:364 (libarrow_acero.1900.0.0.dylib:arm64+0x8a2fc)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #8 std::__1::__function::__value_func<arrow::Result<arrow::acero::ExecNode*> (arrow::acero::ExecPlan*, std::__1::vector<arrow::acero::ExecNode*, std::__1::allocator<arrow::acero::ExecNode*>>, arrow::acero::ExecNodeOptions const&)>::operator()[abi:ue170006](arrow::acero::ExecPlan*&&, std::__1::vector<arrow::acero::ExecNode*, std::__1::allocator<arrow::acero::ExecNode*>>&&, arrow::acero::ExecNodeOptions const&) const function.h:518 (libarrow_acero.1900.0.0.dylib:arm64+0x1a0930)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #9 std::__1::function<arrow::Result<arrow::acero::ExecNode*> (arrow::acero::ExecPlan*, std::__1::vector<arrow::acero::ExecNode*, std::__1::allocator<arrow::acero::ExecNode*>>, arrow::acero::ExecNodeOptions const&)>::operator()(arrow::acero::ExecPlan*, std::__1::vector<arrow::acero::ExecNode*, std::__1::allocator<arrow::acero::ExecNode*>>, arrow::acero::ExecNodeOptions const&) const function.h:1169 (libarrow_acero.1900.0.0.dylib:arm64+0x1a0428)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #10 arrow::acero::MakeExecNode(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char>> const&, arrow::acero::ExecPlan*, std::__1::vector<arrow::acero::ExecNode*, std::__1::allocator<arrow::acero::ExecNode*>>, arrow::acero::ExecNodeOptions const&, arrow::acero::ExecFactoryRegistry*) exec_plan.h:381 (libarrow_acero.1900.0.0.dylib:arm64+0x179a54)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #11 arrow::acero::Declaration::AddToPlan(arrow::acero::ExecPlan*, arrow::acero::ExecFactoryRegistry*) const exec_plan.cc:585 (libarrow_acero.1900.0.0.dylib:arm64+0x179258)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #12 arrow::acero::Declaration::AddToPlan(arrow::acero::ExecPlan*, arrow::acero::ExecFactoryRegistry*) const exec_plan.cc:581 (libarrow_acero.1900.0.0.dylib:arm64+0x1790d0)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #13 arrow::acero::(anonymous namespace)::DeclarationToExecBatchesImpl(arrow::acero::Declaration, arrow::acero::QueryOptions, arrow::internal::Executor*) exec_plan.cc:686 (libarrow_acero.1900.0.0.dylib:arm64+0x17e3c8)
$92│     #14 arrow::acero::DeclarationToExecBatches(arrow::acero::Declaration, bool, arrow::MemoryPool*, arrow::compute::FunctionRegistry*)::$_8::operator()(arrow::internal::Executor*) const exec_plan.cc:881 (libarrow_acero.1900.0.0.dylib:arm64+0x1e6d70)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #15 arrow::internal::FnOnce<arrow::Future<arrow::acero::BatchesWithCommonSchema> (arrow::internal::Executor*)>::FnImpl<arrow::acero::DeclarationToExecBatches(arrow::acero::Declaration, bool, arrow::MemoryPool*, arrow::compute::FunctionRegistry*)::$_8>::invoke(arrow::internal::Executor*&&) functional.h:152 (libarrow_acero.1900.0.0.dylib:arm64+0x1e6b64)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #16 arrow::internal::FnOnce<arrow::Future<arrow::acero::BatchesWithCommonSchema> (arrow::internal::Executor*)>::operator()(arrow::internal::Executor*) && functional.h:140 (libarrow_acero.1900.0.0.dylib:arm64+0x1e567c)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #17 arrow::Future<arrow::acero::BatchesWithCommonSchema>::SyncType arrow::internal::RunSynchronously<arrow::Future<arrow::acero::BatchesWithCommonSchema>, arrow::acero::BatchesWithCommonSchema>(arrow::internal::FnOnce<arrow::Future<arrow::acero::BatchesWithCommonSchema> (arrow::internal::Executor*)>, bool) thread_pool.h:587 (libarrow_acero.1900.0.0.dylib:arm64+0x17f2f4)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #18 arrow::acero::DeclarationToExecBatches(arrow::acero::Declaration, bool, arrow::MemoryPool*, arrow::compute::FunctionRegistry*) exec_plan.cc:878 (libarrow_acero.1900.0.0.dylib:arm64+0x17f214)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #19 arrow::acero::(anonymous namespace)::AssertRowCountEq(arrow::acero::Declaration, long long) hash_join_node_test.cc:3268 (arrow-acero-hash-join-node-test:arm64+0x100136d1c)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #20 arrow::acero::HashJoin_DISABLED_BuildSideOver4GBFixedLength_Test::TestBody() hash_join_node_test.cc:3351 (arrow-acero-hash-join-node-test:arm64+0x100135f3c)
$92│ 
$92│ ⬆ std::cerr
$92│ ⬇ std::cerr:
$92│     #21 arrow::acero::HashJoin_DISABLED_BuildSideOver4GBFixedLength_Test::TestBody() hash_join_node_test.cc:3282 (arrow-acero-hash-join-node-test:arm64+0x10013412c)

The same warning was also observed when I ran other existing unit tests.

Though I'm not sure why our CI's TSAN doesn't detect this, the race is pretty straightforward.

The member

// A segmenter for the segment-keys
std::unique_ptr<RowSegmenter> segmenter_;
is used to cache segment key values for every batch in every thread. No wonder WW race will happen if more then one thread processes any batch.

Component(s)

C++

pitrou pushed a commit that referenced this issue Mar 19, 2025
### Rationale for this change

Data race described in #45788 .

### What changes are included in this PR?

Put the racing member `segmenter_values` in thread local state.

### Are these changes tested?

Yes. UT added.

### Are there any user-facing changes?

None.

* GitHub Issue: #45788

Authored-by: Rossi Sun <zanmato1984@gmail.com>
Signed-off-by: Antoine Pitrou <antoine@python.org>
@pitrou pitrou added this to the 20.0.0 milestone Mar 19, 2025
@pitrou
Copy link
Member

pitrou commented Mar 19, 2025

Issue resolved by pull request 45789
#45789

@pitrou pitrou closed this as completed Mar 19, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants