-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Memory stats #2162
feat: Memory stats #2162
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
+4 −0 | util/fibers/simple_channel.h | |
+3 −5 | util/tls/tls_engine.cc | |
+9 −3 | util/tls/tls_engine.h | |
+19 −18 | util/tls/tls_socket.cc | |
+3 −4 | util/tls/tls_socket.h |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
// Copyright 2023, DragonflyDB authors. All rights reserved. | ||
// See LICENSE for licensing terms. | ||
// | ||
|
||
#pragma once | ||
|
||
#include "core/fibers.h" | ||
|
||
namespace dfly { | ||
|
||
// SimpleQueue-like interface, but also keeps track over the size of Ts it owns. | ||
// It has a slightly less efficient TryPush() API as it forces construction of Ts even if they are | ||
// not pushed. | ||
// T must have a .size() method, which should return the heap-allocated size of T, excluding | ||
// anything included in sizeof(T). We could generalize this in the future. | ||
template <typename T, typename Queue = folly::ProducerConsumerQueue<T>> class SizeTrackingChannel { | ||
public: | ||
SizeTrackingChannel(size_t n, unsigned num_producers = 1) : queue_(n, num_producers) { | ||
} | ||
|
||
// Here and below, we must accept a T instead of building it from variadic args, as we need to | ||
// know its size in case it is added. | ||
void Push(T t) noexcept { | ||
size_ += t.size(); | ||
queue_.Push(std::move(t)); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also about (2), it will now be the case that because we have two separate memory tracking systems, info memory and memory stats will return different results 🤷🏻♂️🤷🏻♂️ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Re/ 1: good comment, will fix, thanks! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, I just meant that if you don't unify the tracking approaches eventually, they'll not only have duplicated code, but will also diverge in values 🙂 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason not to modify There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, there is no reason not to modify it PS: It does decrease the number, pulled_bytes grows, so it shouldn't reach zero at the very end 🤔 size_t total_bytes = pushed_bytes.load(memory_order_relaxed) + serializer_bytes.load(memory_order_relaxed) - pulled_bytes; There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. right, I missed that. |
||
|
||
bool TryPush(T t) noexcept { | ||
const size_t tmp_size = t.size(); | ||
if (queue_.TryPush(std::move(t))) { | ||
size_ += tmp_size; | ||
return true; | ||
} | ||
|
||
return false; | ||
} | ||
|
||
bool Pop(T& dest) { | ||
if (queue_.Pop(dest)) { | ||
size_ -= dest.size(); | ||
return true; | ||
} | ||
|
||
return false; | ||
} | ||
|
||
void StartClosing() { | ||
queue_.StartClosing(); | ||
} | ||
|
||
bool TryPop(T& dest) { | ||
if (queue_.TryPop(dest)) { | ||
size_ -= dest.size(); | ||
return true; | ||
} | ||
|
||
return false; | ||
} | ||
|
||
bool IsClosing() const { | ||
return queue_.IsClosing(); | ||
} | ||
|
||
size_t GetSize() const { | ||
return queue_.Capacity() * sizeof(T) + size_; | ||
} | ||
|
||
private: | ||
SimpleChannel<T, Queue> queue_; | ||
size_t size_ = 0; | ||
}; | ||
|
||
} // namespace dfly |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,9 +7,13 @@ | |
#include <absl/strings/str_cat.h> | ||
#include <mimalloc.h> | ||
|
||
#include "base/io_buf.h" | ||
#include "facade/dragonfly_connection.h" | ||
#include "facade/error.h" | ||
#include "server/engine_shard_set.h" | ||
#include "server/server_family.h" | ||
#include "server/server_state.h" | ||
#include "server/snapshot.h" | ||
|
||
using namespace std; | ||
using namespace facade; | ||
|
@@ -75,7 +79,7 @@ size_t MemoryUsage(PrimeIterator it) { | |
|
||
} // namespace | ||
|
||
MemoryCmd::MemoryCmd(ServerFamily* owner, ConnectionContext* cntx) : cntx_(cntx) { | ||
MemoryCmd::MemoryCmd(ServerFamily* owner, ConnectionContext* cntx) : cntx_(cntx), owner_(owner) { | ||
} | ||
|
||
void MemoryCmd::Run(CmdArgList args) { | ||
|
@@ -84,6 +88,8 @@ void MemoryCmd::Run(CmdArgList args) { | |
if (sub_cmd == "HELP") { | ||
string_view help_arr[] = { | ||
"MEMORY <subcommand> [<arg> ...]. Subcommands are:", | ||
"STATS", | ||
" Shows breakdown of memory.", | ||
"MALLOC-STATS [BACKING] [thread-id]", | ||
" Show malloc stats for a heap residing in specified thread-id. 0 by default.", | ||
" If BACKING is specified, show stats for the backing heap.", | ||
|
@@ -95,6 +101,10 @@ void MemoryCmd::Run(CmdArgList args) { | |
return (*cntx_)->SendSimpleStrArr(help_arr); | ||
}; | ||
|
||
if (sub_cmd == "STATS") { | ||
return Stats(); | ||
} | ||
|
||
if (sub_cmd == "USAGE" && args.size() > 1) { | ||
string_view key = ArgS(args, 1); | ||
return Usage(key); | ||
|
@@ -143,6 +153,110 @@ void MemoryCmd::Run(CmdArgList args) { | |
return (*cntx_)->SendError(err, kSyntaxErrType); | ||
} | ||
|
||
namespace { | ||
|
||
struct ConnectionMemoryUsage { | ||
size_t connection_count = 0; | ||
size_t pipelined_bytes = 0; | ||
base::IoBuf::MemoryUsage connections_memory; | ||
|
||
size_t replication_connection_count = 0; | ||
base::IoBuf::MemoryUsage replication_memory; | ||
}; | ||
|
||
ConnectionMemoryUsage GetConnectionMemoryUsage(ServerFamily* server) { | ||
Mutex mu; | ||
ConnectionMemoryUsage mem ABSL_GUARDED_BY(mu); | ||
|
||
for (auto* listener : server->GetListeners()) { | ||
listener->TraverseConnections([&](unsigned thread_index, util::Connection* conn) { | ||
auto* dfly_conn = static_cast<facade::Connection*>(conn); | ||
auto* cntx = static_cast<ConnectionContext*>(dfly_conn->cntx()); | ||
lock_guard lock(mu); | ||
|
||
if (cntx->replication_flow == nullptr) { | ||
mem.connection_count++; | ||
mem.connections_memory += dfly_conn->GetMemoryUsage(); | ||
} else { | ||
mem.replication_connection_count++; | ||
mem.replication_memory += dfly_conn->GetMemoryUsage(); | ||
} | ||
|
||
if (cntx != nullptr) { | ||
mem.pipelined_bytes += cntx->conn_state.exec_info.body.capacity() * sizeof(StoredCmd); | ||
for (const auto& pipeline : cntx->conn_state.exec_info.body) { | ||
mem.pipelined_bytes += pipeline.UsedHeapMemory(); | ||
} | ||
} | ||
}); | ||
} | ||
|
||
return mem; | ||
} | ||
|
||
void PushMemoryUsageStats(const base::IoBuf::MemoryUsage& mem, string_view prefix, size_t total, | ||
vector<string>* stats) { | ||
stats->push_back(absl::StrCat(prefix, ".total_bytes")); | ||
stats->push_back(absl::StrCat(total)); | ||
stats->push_back(absl::StrCat(prefix, ".consumed_bytes")); | ||
stats->push_back(absl::StrCat(mem.consumed)); | ||
stats->push_back(absl::StrCat(prefix, ".pending_input_bytes")); | ||
stats->push_back(absl::StrCat(mem.input_length)); | ||
stats->push_back(absl::StrCat(prefix, ".pending_output_bytes")); | ||
stats->push_back(absl::StrCat(mem.append_length)); | ||
} | ||
|
||
} // namespace | ||
|
||
void MemoryCmd::Stats() { | ||
vector<string> stats; | ||
stats.reserve(50); | ||
auto server_metrics = owner_->GetMetrics(); | ||
|
||
// RSS | ||
stats.push_back("rss_bytes"); | ||
stats.push_back(absl::StrCat(rss_mem_current.load())); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i prefer that we pass memory_order_relaxed |
||
stats.push_back("rss_peak_bytes"); | ||
stats.push_back(absl::StrCat(rss_mem_peak.load())); | ||
|
||
// Used by DbShards and DashTable | ||
stats.push_back("data_bytes"); | ||
stats.push_back(absl::StrCat(used_mem_current.load())); | ||
stats.push_back("data_peak_bytes"); | ||
stats.push_back(absl::StrCat(used_mem_peak.load())); | ||
|
||
ConnectionMemoryUsage connection_memory = GetConnectionMemoryUsage(owner_); | ||
|
||
// Connection stats, excluding replication connections | ||
stats.push_back("connections.count"); | ||
stats.push_back(absl::StrCat(connection_memory.connection_count)); | ||
PushMemoryUsageStats( | ||
connection_memory.connections_memory, "connections", | ||
connection_memory.connections_memory.GetTotalSize() + connection_memory.pipelined_bytes, | ||
&stats); | ||
stats.push_back("connections.pipeline_bytes"); | ||
stats.push_back(absl::StrCat(connection_memory.pipelined_bytes)); | ||
|
||
// Replication connection stats | ||
stats.push_back("replication.connections_count"); | ||
stats.push_back(absl::StrCat(connection_memory.replication_connection_count)); | ||
PushMemoryUsageStats(connection_memory.replication_memory, "replication", | ||
connection_memory.replication_memory.GetTotalSize(), &stats); | ||
|
||
Mutex mu; | ||
size_t serialization_memory ABSL_GUARDED_BY(mu); | ||
shard_set->pool()->AwaitFiberOnAll([&](auto*) { | ||
lock_guard lock(mu); | ||
serialization_memory += SliceSnapshot::GetThreadLocalMemoryUsage(); | ||
}); | ||
|
||
// Serialization stats, including both replication-related serialization and saving to RDB files. | ||
stats.push_back("serialization"); | ||
stats.push_back(absl::StrCat(serialization_memory)); | ||
|
||
return (*cntx_)->SendSimpleStrArr(stats); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice that you used a formatted output. nit: you can specify this as MAP to get even better representation for resp3 enabled clients. |
||
} | ||
|
||
void MemoryCmd::Usage(std::string_view key) { | ||
ShardId sid = Shard(key, shard_set->size()); | ||
ssize_t memory_usage = shard_set->pool()->at(sid)->AwaitBrief([key, this]() -> ssize_t { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i used core fibers because I migrated from boost fibers. let's just reference the correct include file directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would mean using
util::fb2::SimpleChannel
instead ofdfly
, I thought that was the purpose of that file?