-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathBasicThreadPool.txt
226 lines (160 loc) · 6.63 KB
/
BasicThreadPool.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
/************************************************************************
MODULE: BasicThreadPool
SUMMARY:
Some simple thread pooling.
You create a thread pool by constructing a BasicThreadPool object.
For example:
long nthreads = 4;
BasicThreadPool pool(nthreads);
creates a thread pool of 4 threads. These threads will exist
until the destructor for pool is called.
The simplest way to use a thread pools is as follows.
Suppose you have a task that consists of N subtasks,
indexed 0..N-1. Then you can write:
pool.exec_range(N,
[&](long first, long last) {
for (long i = first; i < last; i++) {
... code to process subtask i ...
}
}
);
The second argument to exec_range is a C++11 "lambda".
The "[&]" indicates that all local variables in the calling
context are captured by reference, so the lambda body can
reference all visible local variables directly. C++11 provides
other methods for capturing local variables.
As a more concrete example, we could parallelize the following
calculation:
void mul(ZZ *x, const ZZ *a, const ZZ *b, long n)
{
for (long i = 0; i < n; i++)
mul(x[i], a[i], b[i]);
}
as follows:
void mul(ZZ *x, const ZZ *a, const ZZ *b, long n,
BasicThreadPool *pool)
{
pool->exec_range(n,
[&](long first, long last) {
for (long i = first; i < last; i++)
mul(x[i], a[i], b[i]);
} );
}
As another example, we could parallelize the following
calculation:
void mul(ZZ_p *x, const ZZ_p *a, const ZZ_p *b, long n)
{
for (long i = 0; i < n; i++)
mul(x[i], a[i], b[i]);
}
as follows:
void mul(ZZ_p *x, const ZZ_p *a, const ZZ_p *b, long n,
BasicThreadPool *pool)
{
ZZ_pContext context;
context.save();
pool->exec_range(n,
[&](long first, long last) {
context.restore();
for (long i = first; i < last; i++)
mul(x[i], a[i], b[i]);
} );
}
This illustrates a simple and efficient means for ensuring that
all threads are working with the same ZZ_p modulus.
====================================================================
A lower-level interface is also provided.
One can write:
pool.exec_index(n,
[&](long index) {
... code to process index i ...
}
);
This will activate n threads with indices 0..n-1, and execute the given code on
each index. The parameter n must be in the range 0..nthreads, otherwise an
error is raised.
This lower-level interface is useful in some cases, especially when memory is
managed in some special way. For convenience, a method is provided to break
subtasks up into smaller, almost-equal-sized groups of subtasks:
Vec<long> pvec;
long n = pool.SplitProblems(N, pvec);
can be used for this. N is the number of subtasks, indexed 0..N-1. This
method will compute n as needed by exec_index, and the range of subtasks to be
processed by a given index in the range 0..n-1 is pvec[index]..pvec[index+1]-1
Thus, the logic of exec_range example can be written using the lower-level
exec_index interface as follows:
Vec<long> pvec;
long n = pool.SplitProblems(N, pvec);
pool.exec_index(n,
[&](long index) {
long first = pvec[index];
long last = pvec[index+1];
for (long i = first; i < last; i++) {
... code to process subtask i ...
}
}
);
However, with this approach, memory or other resources can be assigned to each
index = 0..n-1, and managed externally.
====================================================================
NOTES:
When one activates a thread pool with nthreads threads, the *current* thread
(the one activating the pool) will also participate in the computation. This
means that the thread pool only contains nthreads-1 other threads.
If, during an activation, any thread throws an exception, it will be caught and
rethrown in the activating thread when all the threads complete. If more than
one thread throws an exception, the first one that is caught is the one that is
rethrown.
Methods are also provided for adding, deleting, and moving threads in and among
thread pools.
If NTL_THREADS=off, the corresponding header file may be included,
by the BasicThreadPool class is not defined.
THREAD BOOSTING:
While users are free to use a thread pool as they wish, NTL can be enabled so
that it *internally* uses a thread pool to speed up certain computations. This
is currently a work in progress. To use this feature, NTL should be configured
with NTL_THREAD_BOOST=on. The user can then initialize the (thread local)
variable NTLThreadPool, either directly or via the convenience function
SetNumThreads (see below).
NTL applications may use the NTLThreadPool themselves: the logic is designed so
that if this pool is already activated when entering a thread-boosted routine,
the thread-boosting is temporarily disabled. This means that an application
can seamlessly use higer-level parallization when possible (which is usually
more effective) or rely on NTL's internal parallelization at a lower leve.
***************************************************************************/
class BasicThreadPool {
private:
BasicThreadPool(const BasicThreadPool&); // disabled
void operator=(const BasicThreadPool&); // disabled
public:
BasicThreadPool(long nthreads);
// creates a pool with nthreads threads, including the current thread
// (so nthreads-1 other threads get created)
template<class Fct>
void exec_index(long cnt, Fct fct);
// activate by index (see example usage above)
template<class Fct>
void exec_range(long sz, Fct fct);
// activate by range (see example usage above)
long SplitProblems(long nproblems, Vec<long>& pvec) const;
// splits nproblems problems among (at most) nthreads threads.
// returns the actual number of threads nt to be used, and
// initializes pvec to have length nt+1, so that for t = 0..nt-1,
// thread t processes subproblems pvec[t]..pvec[t+1]-1
long NumThreads() const;
// return number of threads (including current thread)
bool active() const;
// indicates an activation is in process
void add(long n = 1);
// add n threads to the pool
void remove(long n = 1);
// remove n threads from the pool
void move(BasicThreadPool& other, long n = 1)
// move n threads from other pool to this pool
};
// THREAD BOOSTING FEATURES:
extern thread_local BasicThreadPool *NTLThreadPool;
// pool used internally by NTL with NTL_THREAD_BOOST=on
void SetNumThreads(long n);
// convenience routine to set NTLThreadPool (created using new)
// If NTL_THREADS=off, then this is still defined, but does nothing