-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathDistributedFIFOQueue.chpl
274 lines (227 loc) · 10.5 KB
/
DistributedFIFOQueue.chpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
use CyclicDist;
use LocalAtomicObject;
use Queue;
use SyncQueue;
use Random;
use ReplicatedDist;
/*
A distributed FIFO queue.
The queue makes use of a counter to cycle and to properly load-balance across
all registered locales, where each locale has a per-locale queue. The per-locale
queue further ensures a FIFO ordering, ensuring that any extra items enqueued
after cycling through can be dequeued in the same order. This is because a FIFO
of FIFOs is still a FIFO. For a useful diagram...
Imagine there are N locales, where we denote a head at position Y as H_Y and a
tail at position Z as T_Z.
The Queue is empty:
H_0, T_0 -- Note that the position just needs to be the same...
In this case, if the head and tail are equal, then a dequeue will fail if it was read
as such at the time, but an enqueue would be able to advance the tail.
Enqueue:
H_0, T_1
In this case, the locale at position 0 mod N would end up with an enqueue.
Dequeue:
H_1, T_1
There are two things that can happen: T_0 has not yet finished being enqueued,
upon which it will spin until it has finished (I.E: by repeatedly attempting to dequeue
if it is non-blocking), or it has finished, upon which will do a simple dequeue.
Notice as well that while both Enqueue and Dequeue can be in progress, other tasks
can advance the head and tail to obtain their next index. Overlapping operations are
discussed below.
The position can be obtained and updated in a non-blocking fashion, using the
available atomic primitives such as Compare-And-Swap, of which is the only global
synchronization actually needed. Since the only global synchronization is non-blocking,
lock-free, and wait-free, this ensures scalability. To reduce communication as well,
we perform atomically updating and obtaining the index on the locale owning the queue.
The head and tail of the global counter denotes a sort of 'promise' that a task will,
on enqueue add a new item, and on dequeue remove an existing item. Given that the global counter
is linearizable, we know that the index obtained from the current counter is correct at the time
it was retrieved, and enforcing that all tasks uphold such a promise will ensure it always remains
correct.
As each per-locale queue is updated on the locale it belongs to, meaning majority
of the computation can be performed remotely, it ensures that the workload is very
well balanced. In the cases of overlapping operations on a local queue, a non-blocking
queue is the most optimal, as this would allow enqueues to be fully non-blocking and
lock-free, as the counter is proven non-blocking, lock-free, and wait-free.
While enqueues can be non-blocking (depending on queue implementation), a dequeues
cannot be as it is possible for an enqueue to 'promise' but not finish before another
dequeue on that node occurs. Due to this, a dequeue would need to spin as the index
assigned to it is correct and that *one* of the enqueuers for that index has not completed
but will eventually. In this regard, dequeue is *mostly* non-blocking.
In terms of correctness for the per-locale queues, we know that we have a 'promise'
to perform some task, but beyond that it is not guaranteed that the order that
tasks that received their indices will perform them in the same order. That is
if a task t1 obtained its indice before t2, and both t1 and t2 hash to the same index (read: locale),
it is possible that t2 can complete its operation before t1, therefore it is non-deterministic
in terms of order of completion. However, because overlapping concurrent operations
are non-deterministic in nature, and that the FIFO ordering is in fact preserved on some
level (on the per-locale queue), it is still a valid operation and transparent from the outside.
Considering that PGAS ensures that multi-locale operations finish before continuing, we can
verify that indeed an operation will finish in-order on a sequential task. The only way for
operations to overlap is through overlapping tasks. Compare to a normal synchronized queue,
and even if it is possible that t1 began its operation before t2, it is still possible for t2
to obtain the lock before t1 in cases of unexpected delays. Therefore, the non-deterministic
nature of the per-locale queues are nothing special, and if anything increase overall concurrency.
*/
class WaitListNode {
// Our served queue index
var idx : int = -1;
// If wait is false, we spin
// If wait is true, but completed is false, we are the new combiner thread
// If wait is true and completed is true, we are done and can exit
var wait : atomic bool;
var completed : bool;
// Next in the waitlist
var next : WaitListNode;
}
record WaitList {
var headWaitList : LocalAtomicObject(WaitListNode);
var tailWaitList : LocalAtomicObject(WaitListNode);
proc WaitList() {
headWaitList.write(new WaitListNode());
tailWaitList.write(new WaitListNode());
}
}
var privatizedQueues$ : sync bool;
var privatizedSpace = { 0 .. 0 };
var privatizedDomain = privatizedSpace dmapped ReplicatedDist();
var privatizedWaitList : [privatizedDomain] WaitList;
var privatizedQueues : [privatizedDomain] Queue(int);
config param privatizeFIFO = 1;
// TODO: Expand...
proc createPrivatizedQueue() {
forall loc in Locales {
on loc {
privatizedQueues[0] = new SyncQueue(int);
privatizedWaitList[0] = new WaitList();
}
}
}
class DistributedFIFOQueue : Queue {
// TODO: Let user specify their own background queue...
// Two monotonically increasing counters used in deciding which locale to choose from
var globalHead : atomic uint;
var globalTail : atomic uint;
// per-locale data
var perLocaleSpace = { 0 .. 0 };
var perLocaleDomain = perLocaleSpace dmapped ReplicatedDist();
var localQueues : [perLocaleDomain] Queue(eltType);
var localWaitList : [perLocaleDomain] WaitList;
// TODO: Custom Locales
proc DistributedFIFOQueue(type eltType) {
forall loc in Locales {
on loc {
localQueues[0] = new SyncQueue(eltType);
}
}
if privatizeFIFO {
createPrivatizedQueue();
}
}
proc getNextHeadIndex() : int {
// We want to ensure we do not serve more than the number of tasks
// that can potentially run on a node. This is so we don't end up serving repeated
// requesters and get starved, and (presumably) it is large enough to serve
// most use-cases and scenarios.
var requestsServed = 0;
// Create our dummy node
var nextNode = new WaitListNode();
nextNode.wait.write(true);
nextNode.completed = false;
// Register our dummy node...
var currNode = (if privatizeFIFO then privatizedWaitList[0] else localWaitList[0]).headWaitList.exchange(nextNode);
currNode.next = nextNode;
// Spin until we are alerted...
currNode.wait.waitFor(false);
// If our operation is marked complete, we may safely reclaim it, as it is no
// longer being touched by the combiner thread. We have officially been served...
if currNode.completed {
var retval = currNode.idx;
delete currNode;
return retval;
}
// If we are not marked as complete, we *are* the combiner thread, so begin
// serving everyone's request. As the combiner, it is our sole obligation to
// contest for our global lock.
var tmpNode = currNode;
var tmpNodeNext : WaitListNode;
const maxRequests = here.maxTaskPar;
while (tmpNode.next != nil && requestsServed < maxRequests) {
requestsServed = requestsServed + 1;
// Note: Ensures that we do not touch the current node after it is freed
// by the owning thread...
tmpNodeNext = tmpNode.next;
// Contend for head...
var successful : bool;
while true {
var _tail = globalTail.read();
var _head = globalHead.read();
// Full, we're done here...
if _head == _tail {
break;
}
// Contest...
if globalHead.compareExchangeStrong(_head, _head + 1) {
tmpNode.idx = (_head % numLocales : uint) : int;
}
}
// We are done with this one... Note that this uses an acquire barrier so
// that the owning task sees it as completed before wait is no longer true.
tmpNode.completed = true;
tmpNode.wait.write(false);
tmpNode = tmpNodeNext;
}
// At this point, it means one thing: Either we are on the dummy node, on which
// case nothing happens, or we exceeded the number of requests we can do at once,
// meaning we wake up the next thread as the combiner.
tmpNode.wait.write(false);
return currNode.idx;
}
proc enqueue(elt : eltType) {
var idx : int = (globalTail.fetchAdd(1) % numLocales : uint) : int;
on Locales[idx] {
(if privatizeFIFO then privatizedQueues[0] else localQueues[0]).enqueue(elt);
}
}
proc dequeue() : (bool, eltType) {
var (hasElem, elem) = (true, _defaultOf(eltType));
var idx = getNextHeadIndex();
if idx == -1 {
return (false, _defaultOf(eltType));
}
// Now we get our item from the queue
// Note that at the index given, its possible that an enqueueing task has not
// finished yet, but we know there *should* be at least something for us, so we can
// spin until it has what we want.
on Locales[idx] do {
var retval : (bool, eltType);
while !retval[1] {
retval = (if privatizeFIFO then privatizedQueues[0] else localQueues[0]).dequeue();
if (!retval[1]) {
writeln(here, ": Spinning... HasElem: ", hasElem, ";", "head: ", globalHead.peek(), ", tail: ", globalTail.peek());
chpl_task_yield();
}
}
(hasElem, elem) = retval;
}
return (hasElem, elem);
}
}
proc main() {
var nElems = 100000;
writeln("Starting MPMCQueue Proof of Correctness Test ~ nElems: ", nElems);
var queue = new DistributedFIFOQueue(int);
var randStr = Random.makeRandomStream(int);
for i in 1 .. nElems {
on Locales[randStr.getNext() % numLocales] do queue.enqueue(i);
}
for i in 1 .. nElems {
on Locales[randStr.getNext() % numLocales] {
var (hasElem, elem) = queue.dequeue();
if !hasElem || elem != i {
halt("FAILED TEST! Expected: ", i, ", Received: ", elem, "; HasElem: ", hasElem);
}
}
}
writeln("PASSED TEST!");
}