Skip to content

Commit 89e2302

Browse files
addaleaxtargos
authored andcommitted
src: initialize file trace writer on tracing thread
Run the initialization for the file trace writer’s `uv_async_t`s on the same thread as `uv_run()` for their loop to avoid race conditions. PR-URL: #21867 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Eugene Ostroukhov <eostroukhov@google.com> Reviewed-By: Ali Ijaz Sheikh <ofrobots@google.com>
1 parent 56edd5f commit 89e2302

File tree

5 files changed

+64
-23
lines changed

5 files changed

+64
-23
lines changed

src/node.cc

+1-2
Original file line numberDiff line numberDiff line change
@@ -434,8 +434,7 @@ static struct {
434434
tracing_file_writer_ = tracing_agent_->AddClient(
435435
ParseCommaSeparatedSet(trace_enabled_categories),
436436
std::unique_ptr<tracing::AsyncTraceWriter>(
437-
new tracing::NodeTraceWriter(trace_file_pattern,
438-
tracing_agent_->loop())),
437+
new tracing::NodeTraceWriter(trace_file_pattern)),
439438
tracing::Agent::kUseDefaultCategories);
440439
}
441440
}

src/tracing/agent.cc

+31
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,27 @@ Agent::Agent() {
5353
tracing_controller_->Initialize(nullptr);
5454

5555
CHECK_EQ(uv_loop_init(&tracing_loop_), 0);
56+
CHECK_EQ(uv_async_init(&tracing_loop_,
57+
&initialize_writer_async_,
58+
[](uv_async_t* async) {
59+
Agent* agent = ContainerOf(&Agent::initialize_writer_async_, async);
60+
agent->InitializeWritersOnThread();
61+
}), 0);
62+
}
63+
64+
void Agent::InitializeWritersOnThread() {
65+
Mutex::ScopedLock lock(initialize_writer_mutex_);
66+
while (!to_be_initialized_.empty()) {
67+
AsyncTraceWriter* head = *to_be_initialized_.begin();
68+
head->InitializeOnThread(&tracing_loop_);
69+
to_be_initialized_.erase(head);
70+
}
71+
initialize_writer_condvar_.Broadcast(lock);
5672
}
5773

5874
Agent::~Agent() {
75+
uv_close(reinterpret_cast<uv_handle_t*>(&initialize_writer_async_), nullptr);
76+
uv_run(&tracing_loop_, UV_RUN_ONCE);
5977
CheckedUvLoopClose(&tracing_loop_);
6078
}
6179

@@ -95,9 +113,18 @@ AgentWriterHandle Agent::AddClient(
95113

96114
ScopedSuspendTracing suspend(tracing_controller_, this);
97115
int id = next_writer_id_++;
116+
AsyncTraceWriter* raw = writer.get();
98117
writers_[id] = std::move(writer);
99118
categories_[id] = { use_categories->begin(), use_categories->end() };
100119

120+
{
121+
Mutex::ScopedLock lock(initialize_writer_mutex_);
122+
to_be_initialized_.insert(raw);
123+
uv_async_send(&initialize_writer_async_);
124+
while (to_be_initialized_.count(raw) > 0)
125+
initialize_writer_condvar_.Wait(lock);
126+
}
127+
101128
return AgentWriterHandle(this, id);
102129
}
103130

@@ -120,6 +147,10 @@ void Agent::StopTracing() {
120147

121148
void Agent::Disconnect(int client) {
122149
if (client == kDefaultHandleId) return;
150+
{
151+
Mutex::ScopedLock lock(initialize_writer_mutex_);
152+
to_be_initialized_.erase(writers_[client].get());
153+
}
123154
ScopedSuspendTracing suspend(tracing_controller_, this);
124155
writers_.erase(client);
125156
categories_.erase(client);

src/tracing/agent.h

+10-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "uv.h"
66
#include "v8.h"
77
#include "util.h"
8+
#include "node_mutex.h"
89

910
#include <set>
1011
#include <string>
@@ -23,6 +24,7 @@ class AsyncTraceWriter {
2324
virtual ~AsyncTraceWriter() {}
2425
virtual void AppendTraceEvent(TraceObject* trace_event) = 0;
2526
virtual void Flush(bool blocking) = 0;
27+
virtual void InitializeOnThread(uv_loop_t* loop) {}
2628
};
2729

2830
class TracingController : public v8::platform::tracing::TracingController {
@@ -92,13 +94,11 @@ class Agent {
9294

9395
TraceConfig* CreateTraceConfig() const;
9496

95-
// TODO(addaleax): This design is broken and inherently thread-unsafe.
96-
inline uv_loop_t* loop() { return &tracing_loop_; }
97-
9897
private:
9998
friend class AgentWriterHandle;
10099

101100
static void ThreadCb(void* arg);
101+
void InitializeWritersOnThread();
102102

103103
void Start();
104104
void StopTracing();
@@ -120,6 +120,13 @@ class Agent {
120120
std::unordered_map<int, std::multiset<std::string>> categories_;
121121
std::unordered_map<int, std::unique_ptr<AsyncTraceWriter>> writers_;
122122
TracingController* tracing_controller_ = nullptr;
123+
124+
// Variables related to initializing per-event-loop properties of individual
125+
// writers, such as libuv handles.
126+
Mutex initialize_writer_mutex_;
127+
ConditionVariable initialize_writer_condvar_;
128+
uv_async_t initialize_writer_async_;
129+
std::set<AsyncTraceWriter*> to_be_initialized_;
123130
};
124131

125132
void AgentWriterHandle::reset() {

src/tracing/node_trace_writer.cc

+19-13
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,25 @@
33
#include <string.h>
44
#include <fcntl.h>
55

6-
#include "util.h"
6+
#include "util-inl.h"
77

88
namespace node {
99
namespace tracing {
1010

11-
NodeTraceWriter::NodeTraceWriter(const std::string& log_file_pattern,
12-
uv_loop_t* tracing_loop)
13-
: tracing_loop_(tracing_loop), log_file_pattern_(log_file_pattern) {
11+
NodeTraceWriter::NodeTraceWriter(const std::string& log_file_pattern)
12+
: log_file_pattern_(log_file_pattern) {}
13+
14+
void NodeTraceWriter::InitializeOnThread(uv_loop_t* loop) {
15+
CHECK_NULL(tracing_loop_);
16+
tracing_loop_ = loop;
17+
1418
flush_signal_.data = this;
15-
int err = uv_async_init(tracing_loop_, &flush_signal_, FlushSignalCb);
19+
int err = uv_async_init(tracing_loop_, &flush_signal_,
20+
[](uv_async_t* signal) {
21+
NodeTraceWriter* trace_writer =
22+
ContainerOf(&NodeTraceWriter::flush_signal_, signal);
23+
trace_writer->FlushPrivate();
24+
});
1625
CHECK_EQ(err, 0);
1726

1827
exit_signal_.data = this;
@@ -126,11 +135,6 @@ void NodeTraceWriter::FlushPrivate() {
126135
WriteToFile(std::move(str), highest_request_id);
127136
}
128137

129-
void NodeTraceWriter::FlushSignalCb(uv_async_t* signal) {
130-
NodeTraceWriter* trace_writer = static_cast<NodeTraceWriter*>(signal->data);
131-
trace_writer->FlushPrivate();
132-
}
133-
134138
void NodeTraceWriter::Flush(bool blocking) {
135139
Mutex::ScopedLock scoped_lock(request_mutex_);
136140
if (!json_trace_writer_) {
@@ -170,7 +174,7 @@ void NodeTraceWriter::WriteToFile(std::string&& str, int highest_request_id) {
170174
}
171175

172176
void NodeTraceWriter::WriteCb(uv_fs_t* req) {
173-
WriteRequest* write_req = reinterpret_cast<WriteRequest*>(req);
177+
WriteRequest* write_req = ContainerOf(&WriteRequest::req, req);
174178
CHECK_GE(write_req->req.result, 0);
175179

176180
NodeTraceWriter* writer = write_req->writer;
@@ -187,13 +191,15 @@ void NodeTraceWriter::WriteCb(uv_fs_t* req) {
187191

188192
// static
189193
void NodeTraceWriter::ExitSignalCb(uv_async_t* signal) {
190-
NodeTraceWriter* trace_writer = static_cast<NodeTraceWriter*>(signal->data);
194+
NodeTraceWriter* trace_writer =
195+
ContainerOf(&NodeTraceWriter::exit_signal_, signal);
191196
uv_close(reinterpret_cast<uv_handle_t*>(&trace_writer->flush_signal_),
192197
nullptr);
193198
uv_close(reinterpret_cast<uv_handle_t*>(&trace_writer->exit_signal_),
194199
[](uv_handle_t* signal) {
195200
NodeTraceWriter* trace_writer =
196-
static_cast<NodeTraceWriter*>(signal->data);
201+
ContainerOf(&NodeTraceWriter::exit_signal_,
202+
reinterpret_cast<uv_async_t*>(signal));
197203
Mutex::ScopedLock scoped_lock(trace_writer->request_mutex_);
198204
trace_writer->exited_ = true;
199205
trace_writer->exit_cond_.Signal(scoped_lock);

src/tracing/node_trace_writer.h

+3-5
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
#include <sstream>
55
#include <queue>
66

7-
#include "node_mutex.h"
87
#include "libplatform/v8-tracing.h"
98
#include "tracing/agent.h"
109
#include "uv.h"
@@ -17,10 +16,10 @@ using v8::platform::tracing::TraceWriter;
1716

1817
class NodeTraceWriter : public AsyncTraceWriter {
1918
public:
20-
explicit NodeTraceWriter(const std::string& log_file_pattern,
21-
uv_loop_t* tracing_loop);
19+
explicit NodeTraceWriter(const std::string& log_file_pattern);
2220
~NodeTraceWriter();
2321

22+
void InitializeOnThread(uv_loop_t* loop) override;
2423
void AppendTraceEvent(TraceObject* trace_event) override;
2524
void Flush(bool blocking) override;
2625

@@ -38,11 +37,10 @@ class NodeTraceWriter : public AsyncTraceWriter {
3837
void OpenNewFileForStreaming();
3938
void WriteToFile(std::string&& str, int highest_request_id);
4039
void WriteSuffix();
41-
static void FlushSignalCb(uv_async_t* signal);
4240
void FlushPrivate();
4341
static void ExitSignalCb(uv_async_t* signal);
4442

45-
uv_loop_t* tracing_loop_;
43+
uv_loop_t* tracing_loop_ = nullptr;
4644
// Triggers callback to initiate writing the contents of stream_ to disk.
4745
uv_async_t flush_signal_;
4846
// Triggers callback to close async objects, ending the tracing thread.

0 commit comments

Comments
 (0)