diff --git a/src/common/ring.hpp b/src/common/ring.hpp index bf3e8c2..2ac67c7 100644 --- a/src/common/ring.hpp +++ b/src/common/ring.hpp @@ -5,6 +5,7 @@ #include +#include "ring_generic_allocator.hpp" #include "log.hpp" static_assert(sizeof(std::atomic) == sizeof(uint32_t), ""); @@ -113,6 +114,26 @@ static inline uint32_t sqk_align32pow2(uint32_t x) { return x + 1; } +inline void sqk_wait_until_equal_32( + volatile uint32_t* addr, + uint32_t expected, + std::memory_order memorder +) { + assert( + memorder == std::memory_order_acquire + || memorder == std::memory_order_relaxed + ); + + while (std::atomic_load_explicit( + (volatile std::atomic*)addr, + memorder + ) + != expected) + sqk_pause(); +} + +namespace sqk::common { + struct MemZone { uint8_t* addr_; ssize_t size_; @@ -152,6 +173,8 @@ union RingHtsPosPlain { } pos; }; +static_assert(sizeof(RingHtsPosPlain) == sizeof(RingHtsPos), ""); + union RingHeadTail { struct { volatile uint32_t head_; @@ -180,7 +203,7 @@ template< RingSyncType cons_sync_type = RingSyncType::SQK_RING_SYNC_ST, bool transactional_prod = 0, bool transactional_cons = 0, - typename Allocator = std::allocator> + typename Allocator = Allocator> requires(sizeof(T) % 4 == 0) && std::is_trivially_copyable_v struct Ring { private: @@ -318,10 +341,10 @@ struct Ring { do { n = num; /* - * * wait for tail to be equal to head, - * * make sure that we read prod head/tail *before* - * * reading cons tail. - * */ + * wait for tail to be equal to head, + * make sure that we read prod head/tail *before* + * reading cons tail. + */ this->hts_head_wait(this->prod_.ht, op); /* * The subtraction is done between two unsigned 32bits value @@ -496,19 +519,20 @@ struct Ring { uint32_t single, bool enqueue ) { - if (enqueue) - sqk_smp_wmb(); - else - sqk_smp_rmb(); - /* - * If there are other enqueues/dequeues in progress that preceded us, - * we need to wait for them to complete - */ + SQK_SET_USED(enqueue); + if (!single) - while (unlikely(this->prod_.tail_ != old_val)) - sqk_pause(); + sqk_wait_until_equal_32( + &ht.tail_, + old_val, + std::memory_order_relaxed + ); - ht.tail_ = new_val; + std::atomic_store_explicit( + reinterpret_cast*>(&ht.tail_), + new_val, + std::memory_order_release + ); } template @@ -622,12 +646,14 @@ struct Ring { sqk_smp_rmb(); success = 1; } else { - success = std::atomic_compare_exchange_weak( + success = std::atomic_compare_exchange_strong_explicit( reinterpret_cast*>( &this->cons_.head_ ), &old_head, - new_head + new_head, + std::memory_order_relaxed, + std::memory_order_relaxed ); } } while (unlikely(success == 0)); @@ -804,7 +830,7 @@ struct Ring { return 0; } this->enqueue_elements(prod_head, &entry, n); - this->update_tail(this->prod_, prod_head, prod_next, 0, 1); + this->update_tail(this->prod_, prod_head, prod_next, prod_sync_type == RingSyncType::SQK_RING_SYNC_ST, 1); return n; } else if constexpr (prod_sync_type == RingSyncType::SQK_RING_SYNC_MT_HTS) { @@ -833,7 +859,7 @@ struct Ring { return 0; } this->dequeue_elements(cons_head, &entry, n); - this->update_tail(this->cons_, cons_head, cons_next, 1, 0); + this->update_tail(this->cons_, cons_head, cons_next, cons_sync_type == RingSyncType::SQK_RING_SYNC_ST, 0); return n; } else if constexpr (cons_sync_type == RingSyncType::SQK_RING_SYNC_MT_HTS) { @@ -855,4 +881,23 @@ struct Ring { } }; +template +using MpscRing = Ring; + +template +struct RingGuard { + RingType* ring_; + + RingGuard(uint32_t cnt = 1024) : ring_(RingType::of(cnt)) {} + + ~RingGuard() { + RingType::free(ring_); + } + + RingType* operator->() { + return ring_; + } +}; + +} // namespace sqk::common #endif // !SQK_COMMON_HPP_ diff --git a/src/common/ring_generic_allocator.hpp b/src/common/ring_generic_allocator.hpp new file mode 100644 index 0000000..bd0b954 --- /dev/null +++ b/src/common/ring_generic_allocator.hpp @@ -0,0 +1,24 @@ +#ifndef SQK_COMMON_RING_GENERIC_ALLOCATOR_HPP_ +#define SQK_COMMON_RING_GENERIC_ALLOCATOR_HPP_ + +#ifdef __linux__ +#include +#endif + +#include +namespace sqk::common { + +template +struct Allocator : std::allocator { +#ifdef __linux__ + [[gnu::always_inline]] + constexpr T* allocate(size_t n) { + T* allocated = std::allocator::allocate(n); + madvise(allocated, sizeof(T) * n, MADV_POPULATE_WRITE); + return allocated; + } +#endif +}; +} + +#endif // !SQK_COMMON_RING_GENERIC_ALLOCATOR_HPP_ diff --git a/src/core/core.hpp b/src/core/core.hpp index 163b0d4..95628c4 100644 --- a/src/core/core.hpp +++ b/src/core/core.hpp @@ -26,31 +26,34 @@ struct Task: std::coroutine_handle> { using promise_type = Promise; }; +using sqk::common::MpscRing; +using sqk::common::RingGuard; + struct SQKScheduler { alignas(SQK_CACHE_LINESIZE) bool stopped {}; - std::list> queue_; + RingGuard>> queue_; public: template int enqueue(Task handle) { - queue_.push_back(handle); + queue_->enqueue(handle); return 0; } template int enqueue(T handle) { - queue_.push_back(handle); + queue_->enqueue(handle); return 0; } void stop() { stopped = 1; } + int run() { + std::coroutine_handle<> handle; for (;;) { - if (likely(!queue_.empty())) { - auto handle = queue_.front(); - queue_.pop_front(); + if (likely(queue_->dequeue(handle))) { handle.resume(); if (unlikely(stopped)) { return 0; diff --git a/src/io/blob/CMakeLists.txt b/src/io/blob/CMakeLists.txt index d89adbe..ece2435 100644 --- a/src/io/blob/CMakeLists.txt +++ b/src/io/blob/CMakeLists.txt @@ -14,4 +14,3 @@ target_include_directories(${PROJECT_NAME} ${FABRIC_INCLUDEDIR} ${CMAKE_CURRENT_SOURCE_DIR} ) - diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 7defcd2..446684a 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -1,5 +1,16 @@ project(core-test) +include(FetchContent) + +FetchContent_Declare( + nanobench + GIT_REPOSITORY https://github.com/martinus/nanobench.git + GIT_TAG v4.3.11 + GIT_SHALLOW TRUE) +FetchContent_MakeAvailable(nanobench) + +add_compile_options("-g") + add_executable(${PROJECT_NAME} core_util.cc ) diff --git a/src/tests/common/CMakeLists.txt b/src/tests/common/CMakeLists.txt index 4acfaa6..c1ca00a 100644 --- a/src/tests/common/CMakeLists.txt +++ b/src/tests/common/CMakeLists.txt @@ -3,9 +3,13 @@ project(common-test) add_executable(ring-test ring_test.cc ) -foreach(X IN ITEMS ring-test) +add_executable(ring-bench + ring_bench.cc +) + +foreach(X IN ITEMS ring-test ring-bench) target_include_directories(${X} PRIVATE ${SPDLOG_SOURCE_DIR}/include) - target_link_libraries(${X} common) + target_link_libraries(${X} PRIVATE common nanobench) if (INSTALL_SQKIO) install(TARGETS ${X} RUNTIME DESTINATION ${SQKIO_INSTALL_BINDIR}) diff --git a/src/tests/common/ring_bench.cc b/src/tests/common/ring_bench.cc new file mode 100644 index 0000000..7dbecf4 --- /dev/null +++ b/src/tests/common/ring_bench.cc @@ -0,0 +1,130 @@ +#include + +#include +#include + +#include "ring.hpp" + +using namespace sqk::common; + +int main(int argc, char* argv[]) { + constexpr uint64_t iterations = 1UL * 1000 * 1000 * 10; + { + RingGuard> guard(9216); + ankerl::nanobench::Bench() + .minEpochIterations(iterations) + .run("mpsc_ring enqueue", [&] { + guard->enqueue(1); + int i; + guard->dequeue(i); + }); + } + { + RingGuard> + guard(9216); + ankerl::nanobench::Bench() + .minEpochIterations(iterations) + .run("spsc_ring enqueue", [&] { + guard->enqueue(1); + int i; + guard->dequeue(i); + }); + } + { + RingGuard> + guard(9216); + ankerl::nanobench::Bench() + .minEpochIterations(iterations) + .run("hts mpsc_ring enqueue", [&] { + guard->enqueue(1); + int i; + guard->dequeue(i); + }); + } + { + std::deque deq; + ankerl::nanobench::Bench() + .minEpochIterations(iterations) + .run("deque enqueue", [&] { + deq.push_back(1); + int i = deq.front(); + deq.pop_front(); + }); + } + { + std::list deq; + ankerl::nanobench::Bench() + .minEpochIterations(iterations) + .run("list enqueue", [&] { + deq.push_back(1); + int i = deq.front(); + deq.pop_front(); + }); + } + { + RingGuard> guard(9216); + ankerl::nanobench::Bench() + .minEpochIterations(iterations) + .run("mpsc_ring enqueue", [&] { + guard->enqueue(1); + uint64_t i; + guard->dequeue(i); + }); + } + { + RingGuard> + guard(9216); + ankerl::nanobench::Bench() + .minEpochIterations(iterations) + .run("spsc_ring enqueue", [&] { + guard->enqueue(1); + uint64_t i; + guard->dequeue(i); + }); + } + { + RingGuard> + guard(9216); + ankerl::nanobench::Bench() + .minEpochIterations(iterations) + .run("hts mpsc_ring enqueue", [&] { + guard->enqueue(1); + uint64_t i; + guard->dequeue(i); + }); + } + { + std::deque deq; + ankerl::nanobench::Bench() + .minEpochIterations(iterations) + .run("deque enqueue", [&] { + deq.push_back(1); + uint64_t i = deq.front(); + deq.pop_front(); + }); + } + { + std::list deq; + ankerl::nanobench::Bench() + .minEpochIterations(iterations) + .run("list enqueue", [&] { + deq.push_back(1); + uint64_t i = deq.front(); + deq.pop_front(); + }); + } + + return 0; +} diff --git a/src/tests/common/ring_test.cc b/src/tests/common/ring_test.cc index 9f57ca7..ce9972e 100644 --- a/src/tests/common/ring_test.cc +++ b/src/tests/common/ring_test.cc @@ -1,5 +1,7 @@ #include "ring.hpp" +using namespace sqk::common; + int main(int argc, char* argv[]) { S_LOGGER_SETUP; { diff --git a/src/tests/io/blob/CMakeLists.txt b/src/tests/io/blob/CMakeLists.txt index ab7a89f..ca13691 100644 --- a/src/tests/io/blob/CMakeLists.txt +++ b/src/tests/io/blob/CMakeLists.txt @@ -13,5 +13,3 @@ foreach(X IN ITEMS blob-test) endif() endforeach() - - diff --git a/src/tests/io/blob/blob_test.cc b/src/tests/io/blob/blob_test.cc index 5560121..77ac276 100644 --- a/src/tests/io/blob/blob_test.cc +++ b/src/tests/io/blob/blob_test.cc @@ -34,6 +34,7 @@ const char* dpdk_cli_override_opts = "--no-telemetry"; int main(int argc, char* argv[]) { + S_LOGGER_SETUP; sqk::scheduler = new sqk::SQKScheduler; BlobEnv env; BlobOptions opts;