Skip to content

Commit

Permalink
add ring bench
Browse files Browse the repository at this point in the history
Signed-off-by: wineway <wangyuweihx@gmail.com>
  • Loading branch information
wineway committed Aug 19, 2024
1 parent ffa366f commit 2d62941
Show file tree
Hide file tree
Showing 10 changed files with 248 additions and 31 deletions.
85 changes: 65 additions & 20 deletions src/common/ring.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <cstdint>

#include "ring_generic_allocator.hpp"
#include "log.hpp"

static_assert(sizeof(std::atomic<uint32_t>) == sizeof(uint32_t), "");
Expand Down Expand Up @@ -113,6 +114,26 @@ static inline uint32_t sqk_align32pow2(uint32_t x) {
return x + 1;
}

inline void sqk_wait_until_equal_32(
volatile uint32_t* addr,
uint32_t expected,
std::memory_order memorder
) {
assert(
memorder == std::memory_order_acquire
|| memorder == std::memory_order_relaxed
);

while (std::atomic_load_explicit(
(volatile std::atomic<uint32_t>*)addr,
memorder
)
!= expected)
sqk_pause();
}

namespace sqk::common {

struct MemZone {
uint8_t* addr_;
ssize_t size_;
Expand Down Expand Up @@ -152,6 +173,8 @@ union RingHtsPosPlain {
} pos;
};

static_assert(sizeof(RingHtsPosPlain) == sizeof(RingHtsPos), "");

union RingHeadTail {
struct {
volatile uint32_t head_;
Expand Down Expand Up @@ -180,7 +203,7 @@ template<
RingSyncType cons_sync_type = RingSyncType::SQK_RING_SYNC_ST,
bool transactional_prod = 0,
bool transactional_cons = 0,
typename Allocator = std::allocator<uint8_t>>
typename Allocator = Allocator<uint8_t>>
requires(sizeof(T) % 4 == 0) && std::is_trivially_copyable_v<T>
struct Ring {
private:
Expand Down Expand Up @@ -318,10 +341,10 @@ struct Ring {
do {
n = num;
/*
* * wait for tail to be equal to head,
* * make sure that we read prod head/tail *before*
* * reading cons tail.
* */
* wait for tail to be equal to head,
* make sure that we read prod head/tail *before*
* reading cons tail.
*/
this->hts_head_wait(this->prod_.ht, op);
/*
* The subtraction is done between two unsigned 32bits value
Expand Down Expand Up @@ -496,19 +519,20 @@ struct Ring {
uint32_t single,
bool enqueue
) {
if (enqueue)
sqk_smp_wmb();
else
sqk_smp_rmb();
/*
* If there are other enqueues/dequeues in progress that preceded us,
* we need to wait for them to complete
*/
SQK_SET_USED(enqueue);

if (!single)
while (unlikely(this->prod_.tail_ != old_val))
sqk_pause();
sqk_wait_until_equal_32(
&ht.tail_,
old_val,
std::memory_order_relaxed
);

ht.tail_ = new_val;
std::atomic_store_explicit(
reinterpret_cast<volatile std::atomic<uint32_t>*>(&ht.tail_),
new_val,
std::memory_order_release
);
}

template<RingSyncType sync_type = cons_sync_type>
Expand Down Expand Up @@ -622,12 +646,14 @@ struct Ring {
sqk_smp_rmb();
success = 1;
} else {
success = std::atomic_compare_exchange_weak(
success = std::atomic_compare_exchange_strong_explicit(
reinterpret_cast<volatile std::atomic<uint32_t>*>(
&this->cons_.head_
),
&old_head,
new_head
new_head,
std::memory_order_relaxed,
std::memory_order_relaxed
);
}
} while (unlikely(success == 0));
Expand Down Expand Up @@ -804,7 +830,7 @@ struct Ring {
return 0;
}
this->enqueue_elements(prod_head, &entry, n);
this->update_tail(this->prod_, prod_head, prod_next, 0, 1);
this->update_tail(this->prod_, prod_head, prod_next, prod_sync_type == RingSyncType::SQK_RING_SYNC_ST, 1);
return n;
} else if constexpr (prod_sync_type
== RingSyncType::SQK_RING_SYNC_MT_HTS) {
Expand Down Expand Up @@ -833,7 +859,7 @@ struct Ring {
return 0;
}
this->dequeue_elements(cons_head, &entry, n);
this->update_tail(this->cons_, cons_head, cons_next, 1, 0);
this->update_tail(this->cons_, cons_head, cons_next, cons_sync_type == RingSyncType::SQK_RING_SYNC_ST, 0);
return n;
} else if constexpr (cons_sync_type
== RingSyncType::SQK_RING_SYNC_MT_HTS) {
Expand All @@ -855,4 +881,23 @@ struct Ring {
}
};

template<typename T>
using MpscRing = Ring<T, RingSyncType::SQK_RING_SYNC_MT, RingSyncType::SQK_RING_SYNC_ST>;

template<typename RingType>
struct RingGuard {
RingType* ring_;

RingGuard(uint32_t cnt = 1024) : ring_(RingType::of(cnt)) {}

~RingGuard() {
RingType::free(ring_);
}

RingType* operator->() {
return ring_;
}
};

} // namespace sqk::common
#endif // !SQK_COMMON_HPP_
24 changes: 24 additions & 0 deletions src/common/ring_generic_allocator.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#ifndef SQK_COMMON_RING_GENERIC_ALLOCATOR_HPP_
#define SQK_COMMON_RING_GENERIC_ALLOCATOR_HPP_

#ifdef __linux__
#include <sys/mman.h>
#endif

#include <memory>
namespace sqk::common {

template <typename T>
struct Allocator : std::allocator<T> {
#ifdef __linux__
[[gnu::always_inline]]
constexpr T* allocate(size_t n) {
T* allocated = std::allocator<T>::allocate(n);
madvise(allocated, sizeof(T) * n, MADV_POPULATE_WRITE);
return allocated;
}
#endif
};
}

#endif // !SQK_COMMON_RING_GENERIC_ALLOCATOR_HPP_
15 changes: 9 additions & 6 deletions src/core/core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,34 @@ struct Task: std::coroutine_handle<Promise<T>> {
using promise_type = Promise<T>;
};

using sqk::common::MpscRing;
using sqk::common::RingGuard;

struct SQKScheduler {
alignas(SQK_CACHE_LINESIZE) bool stopped {};
std::list<std::coroutine_handle<>> queue_;
RingGuard<MpscRing<std::coroutine_handle<>>> queue_;

public:
template<typename T>
int enqueue(Task<T> handle) {
queue_.push_back(handle);
queue_->enqueue(handle);
return 0;
}

template<typename T>
int enqueue(T handle) {
queue_.push_back(handle);
queue_->enqueue(handle);
return 0;
}

void stop() {
stopped = 1;
}

int run() {
std::coroutine_handle<> handle;
for (;;) {
if (likely(!queue_.empty())) {
auto handle = queue_.front();
queue_.pop_front();
if (likely(queue_->dequeue(handle))) {
handle.resume();
if (unlikely(stopped)) {
return 0;
Expand Down
1 change: 0 additions & 1 deletion src/io/blob/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,3 @@ target_include_directories(${PROJECT_NAME}
${FABRIC_INCLUDEDIR}
${CMAKE_CURRENT_SOURCE_DIR}
)

11 changes: 11 additions & 0 deletions src/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
project(core-test)

include(FetchContent)

FetchContent_Declare(
nanobench
GIT_REPOSITORY https://github.com/martinus/nanobench.git
GIT_TAG v4.3.11
GIT_SHALLOW TRUE)
FetchContent_MakeAvailable(nanobench)

add_compile_options("-g")

add_executable(${PROJECT_NAME}
core_util.cc
)
Expand Down
8 changes: 6 additions & 2 deletions src/tests/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ project(common-test)
add_executable(ring-test
ring_test.cc
)
foreach(X IN ITEMS ring-test)
add_executable(ring-bench
ring_bench.cc
)

foreach(X IN ITEMS ring-test ring-bench)
target_include_directories(${X} PRIVATE ${SPDLOG_SOURCE_DIR}/include)
target_link_libraries(${X} common)
target_link_libraries(${X} PRIVATE common nanobench)
if (INSTALL_SQKIO)
install(TARGETS ${X}
RUNTIME DESTINATION ${SQKIO_INSTALL_BINDIR})
Expand Down
130 changes: 130 additions & 0 deletions src/tests/common/ring_bench.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#include <nanobench.h>

#include <deque>
#include <list>

#include "ring.hpp"

using namespace sqk::common;

int main(int argc, char* argv[]) {
constexpr uint64_t iterations = 1UL * 1000 * 1000 * 10;
{
RingGuard<MpscRing<int>> guard(9216);
ankerl::nanobench::Bench()
.minEpochIterations(iterations)
.run("mpsc_ring enqueue", [&] {
guard->enqueue(1);
int i;
guard->dequeue(i);
});
}
{
RingGuard<Ring<
int,
RingSyncType::SQK_RING_SYNC_ST,
RingSyncType::SQK_RING_SYNC_ST>>
guard(9216);
ankerl::nanobench::Bench()
.minEpochIterations(iterations)
.run("spsc_ring enqueue", [&] {
guard->enqueue(1);
int i;
guard->dequeue(i);
});
}
{
RingGuard<Ring<
int,
RingSyncType::SQK_RING_SYNC_MT_HTS,
RingSyncType::SQK_RING_SYNC_ST>>
guard(9216);
ankerl::nanobench::Bench()
.minEpochIterations(iterations)
.run("hts mpsc_ring enqueue", [&] {
guard->enqueue(1);
int i;
guard->dequeue(i);
});
}
{
std::deque<int> deq;
ankerl::nanobench::Bench()
.minEpochIterations(iterations)
.run("deque enqueue", [&] {
deq.push_back(1);
int i = deq.front();
deq.pop_front();
});
}
{
std::list<int> deq;
ankerl::nanobench::Bench()
.minEpochIterations(iterations)
.run("list enqueue", [&] {
deq.push_back(1);
int i = deq.front();
deq.pop_front();
});
}
{
RingGuard<MpscRing<uint64_t>> guard(9216);
ankerl::nanobench::Bench()
.minEpochIterations(iterations)
.run("mpsc_ring enqueue", [&] {
guard->enqueue(1);
uint64_t i;
guard->dequeue(i);
});
}
{
RingGuard<Ring<
uint64_t,
RingSyncType::SQK_RING_SYNC_ST,
RingSyncType::SQK_RING_SYNC_ST>>
guard(9216);
ankerl::nanobench::Bench()
.minEpochIterations(iterations)
.run("spsc_ring enqueue", [&] {
guard->enqueue(1);
uint64_t i;
guard->dequeue(i);
});
}
{
RingGuard<Ring<
uint64_t,
RingSyncType::SQK_RING_SYNC_MT_HTS,
RingSyncType::SQK_RING_SYNC_ST>>
guard(9216);
ankerl::nanobench::Bench()
.minEpochIterations(iterations)
.run("hts mpsc_ring enqueue", [&] {
guard->enqueue(1);
uint64_t i;
guard->dequeue(i);
});
}
{
std::deque<uint64_t> deq;
ankerl::nanobench::Bench()
.minEpochIterations(iterations)
.run("deque enqueue", [&] {
deq.push_back(1);
uint64_t i = deq.front();
deq.pop_front();
});
}
{
std::list<uint64_t> deq;
ankerl::nanobench::Bench()
.minEpochIterations(iterations)
.run("list enqueue", [&] {
deq.push_back(1);
uint64_t i = deq.front();
deq.pop_front();
});
}

return 0;
}
Loading

0 comments on commit 2d62941

Please sign in to comment.