@@ -33,12 +33,18 @@ static uv_once_t once = UV_ONCE_INIT;
33
33
static uv_cond_t cond ;
34
34
static uv_mutex_t mutex ;
35
35
static unsigned int idle_threads ;
36
+ static unsigned int slow_io_work_running ;
36
37
static unsigned int nthreads ;
37
38
static uv_thread_t * threads ;
38
39
static uv_thread_t default_threads [4 ];
39
40
static QUEUE exit_message ;
40
41
static QUEUE wq ;
42
+ static QUEUE run_slow_work_message ;
43
+ static QUEUE slow_io_pending_wq ;
41
44
45
+ static unsigned int slow_work_thread_threshold (void ) {
46
+ return (nthreads + 1 ) / 2 ;
47
+ }
42
48
43
49
static void uv__cancelled (struct uv__work * w ) {
44
50
abort ();
@@ -51,38 +57,73 @@ static void uv__cancelled(struct uv__work* w) {
51
57
static void worker (void * arg ) {
52
58
struct uv__work * w ;
53
59
QUEUE * q ;
60
+ int is_slow_work ;
54
61
55
62
uv_sem_post ((uv_sem_t * ) arg );
56
63
arg = NULL ;
57
64
58
65
for (;;) {
59
66
uv_mutex_lock (& mutex );
60
67
61
- while (QUEUE_EMPTY (& wq )) {
68
+ wait_for_work :
69
+ /* Keep waiting while either no work is present or only slow I/O
70
+ and we're at the threshold for that. */
71
+ while (QUEUE_EMPTY (& wq ) ||
72
+ (QUEUE_HEAD (& wq ) == & run_slow_work_message &&
73
+ QUEUE_NEXT (& run_slow_work_message ) == & wq &&
74
+ slow_io_work_running >= slow_work_thread_threshold ())) {
62
75
idle_threads += 1 ;
63
76
uv_cond_wait (& cond , & mutex );
64
77
idle_threads -= 1 ;
65
78
}
66
79
67
80
q = QUEUE_HEAD (& wq );
68
-
69
- if (q == & exit_message )
81
+ if (q == & exit_message ) {
70
82
uv_cond_signal (& cond );
71
- else {
83
+ uv_mutex_unlock (& mutex );
84
+ break ;
85
+ }
86
+
87
+ QUEUE_REMOVE (q );
88
+ QUEUE_INIT (q ); /* Signal uv_cancel() that the work req is executing. */
89
+
90
+ is_slow_work = 0 ;
91
+ if (q == & run_slow_work_message ) {
92
+ /* If we're at the slow I/O threshold, re-schedule until after all
93
+ other work in the queue is done. */
94
+ if (slow_io_work_running >= slow_work_thread_threshold ()) {
95
+ QUEUE_INSERT_TAIL (& wq , q );
96
+ goto wait_for_work ;
97
+ }
98
+
99
+ /* If we encountered a request to run slow I/O work but there is none
100
+ to run, that means it's cancelled => Start over. */
101
+ if (QUEUE_EMPTY (& slow_io_pending_wq ))
102
+ goto wait_for_work ;
103
+
104
+ is_slow_work = 1 ;
105
+ slow_io_work_running ++ ;
106
+
107
+ q = QUEUE_HEAD (& slow_io_pending_wq );
72
108
QUEUE_REMOVE (q );
73
- QUEUE_INIT (q ); /* Signal uv_cancel() that the work req is
74
- executing. */
109
+ QUEUE_INIT (q );
110
+
111
+ /* If there is more slow I/O work, schedule it to be run as well. */
112
+ if (!QUEUE_EMPTY (& slow_io_pending_wq )) {
113
+ QUEUE_INSERT_TAIL (& wq , & run_slow_work_message );
114
+ if (idle_threads > 0 )
115
+ uv_cond_signal (& cond );
116
+ }
75
117
}
76
118
77
119
uv_mutex_unlock (& mutex );
78
120
79
- if (q == & exit_message )
80
- break ;
81
-
82
121
w = QUEUE_DATA (q , struct uv__work , wq );
83
122
w -> work (w );
84
123
85
124
uv_mutex_lock (& w -> loop -> wq_mutex );
125
+ if (is_slow_work )
126
+ slow_io_work_running -- ;
86
127
w -> work = NULL ; /* Signal uv_cancel() that the work req is done
87
128
executing. */
88
129
QUEUE_INSERT_TAIL (& w -> loop -> wq , & w -> wq );
@@ -92,8 +133,20 @@ static void worker(void* arg) {
92
133
}
93
134
94
135
95
- static void post (QUEUE * q ) {
136
+ static void post (QUEUE * q , enum uv__work_kind kind ) {
96
137
uv_mutex_lock (& mutex );
138
+ if (kind == UV__WORK_SLOW_IO ) {
139
+ /* Insert into a separate queue. */
140
+ QUEUE_INSERT_TAIL (& slow_io_pending_wq , q );
141
+ if (!QUEUE_EMPTY (& run_slow_work_message )) {
142
+ /* Running slow I/O tasks is already scheduled => Nothing to do here.
143
+ The worker that runs said other task will schedule this one as well. */
144
+ uv_mutex_unlock (& mutex );
145
+ return ;
146
+ }
147
+ q = & run_slow_work_message ;
148
+ }
149
+
97
150
QUEUE_INSERT_TAIL (& wq , q );
98
151
if (idle_threads > 0 )
99
152
uv_cond_signal (& cond );
@@ -108,7 +161,7 @@ UV_DESTRUCTOR(static void cleanup(void)) {
108
161
if (nthreads == 0 )
109
162
return ;
110
163
111
- post (& exit_message );
164
+ post (& exit_message , UV__WORK_CPU );
112
165
113
166
for (i = 0 ; i < nthreads ; i ++ )
114
167
if (uv_thread_join (threads + i ))
@@ -156,6 +209,8 @@ static void init_threads(void) {
156
209
abort ();
157
210
158
211
QUEUE_INIT (& wq );
212
+ QUEUE_INIT (& slow_io_pending_wq );
213
+ QUEUE_INIT (& run_slow_work_message );
159
214
160
215
if (uv_sem_init (& sem , 0 ))
161
216
abort ();
@@ -194,13 +249,14 @@ static void init_once(void) {
194
249
195
250
void uv__work_submit (uv_loop_t * loop ,
196
251
struct uv__work * w ,
252
+ enum uv__work_kind kind ,
197
253
void (* work )(struct uv__work * w ),
198
254
void (* done )(struct uv__work * w , int status )) {
199
255
uv_once (& once , init_once );
200
256
w -> loop = loop ;
201
257
w -> work = work ;
202
258
w -> done = done ;
203
- post (& w -> wq );
259
+ post (& w -> wq , kind );
204
260
}
205
261
206
262
@@ -284,7 +340,11 @@ int uv_queue_work(uv_loop_t* loop,
284
340
req -> loop = loop ;
285
341
req -> work_cb = work_cb ;
286
342
req -> after_work_cb = after_work_cb ;
287
- uv__work_submit (loop , & req -> work_req , uv__queue_work , uv__queue_done );
343
+ uv__work_submit (loop ,
344
+ & req -> work_req ,
345
+ UV__WORK_CPU ,
346
+ uv__queue_work ,
347
+ uv__queue_done );
288
348
return 0 ;
289
349
}
290
350
0 commit comments