@@ -4,6 +4,7 @@ import sdc.intrinsics;
44
55import d.gc.emap;
66import d.gc.hooks;
7+ import d.gc.mtqueue;
78import d.gc.range;
89import d.gc.slab;
910import d.gc.spec;
@@ -14,18 +15,28 @@ private:
1415 import d.sync.mutex;
1516 Mutex mutex;
1617
17- uint activeThreads;
18- uint cursor;
19- WorkItem[] worklist;
20-
2118 ubyte _gcCycle;
2219 AddressRange _managedAddressSpace;
2320
24- enum MaxRefill = 4 ;
21+ // We do not want false sharing between the mutex and
22+ // the work queue, so we add padding here.
23+ Padding! 4 _pad0;
24+
25+ alias WorkQueue = ConcurentQueue! WorkItem;
26+ WorkQueue workQueue;
27+
28+ import d.sync.atomic;
29+ shared Atomic ! uint totalThreads;
30+ shared Atomic ! uint activeThreads;
31+
32+ // We do not want false sharing with whatever comes after this
33+ // struct in memory here, so we add padding here.
34+ Padding! 1 _pad1;
2535
2636public :
2737 this (uint threadCount, ubyte gcCycle, AddressRange managedAddressSpace) {
28- activeThreads = threadCount;
38+ totalThreads.store(threadCount);
39+ activeThreads.store(threadCount);
2940
3041 this ._gcCycle = gcCycle;
3142 this ._managedAddressSpace = managedAddressSpace;
@@ -50,8 +61,17 @@ public:
5061 }
5162
5263 void mark () shared {
64+ enum QueueSize = 4096 ;
65+ auto size = QueueSize * WorkItem.sizeof;
66+
67+ import d.gc.tcache;
68+ auto ptr = cast (WorkItem* ) threadCache.alloc(size, false , false );
69+ scope (exit) threadCache.free(ptr);
70+
71+ (cast (Scanner* ) &this ).workQueue = WorkQueue(ptr[0 .. QueueSize]);
72+
5373 import core.stdc.pthread ;
54- auto threadCount = activeThreads - 1 ;
74+ auto threadCount = totalThreads.load() - 1 ;
5575 auto threadsPtr =
5676 cast (pthread_t * ) alloca (pthread_t .sizeof * threadCount);
5777 auto threads = threadsPtr[0 .. threadCount];
@@ -82,108 +102,78 @@ public:
82102 // Now send this thread marking!
83103 worker.runMark();
84104
85- // We now done, we can free the worklist.
86- import d.gc.tcache;
87- threadCache.free(cast (void * ) worklist.ptr);
88-
89105 foreach (tid; threads) {
90106 void * ret;
91107 pthread_join(tid, &ret);
92108 }
93109 }
94110
95- void addToWorkList (WorkItem[] items) shared {
96- mutex.lock();
97- scope (exit) mutex.unlock( );
111+ void addToWorkQueue (WorkItem[] items,
112+ ref Overflow ! WorkItem overflow) shared {
113+ workQueue.insert(items, overflow );
98114
99- (cast (Scanner* ) &this ).addToWorkListImpl(items);
115+ // If some threads are starved, notify them.
116+ // This inherently race-y, but it always eventually
117+ // picks up starved threads, so this is good enough.
118+ if (activeThreads.load() < totalThreads.load()) {
119+ // FIXME: Add a notify feature to the mutex.
120+ mutex.lock();
121+ mutex.unlock();
122+ }
123+ }
124+
125+ size_t popFromWorkQueue (WorkItem[] items,
126+ ref Overflow! WorkItem overflow) shared {
127+ return workQueue.pop(items, overflow);
100128 }
101129
102130private :
103- uint waitForWork (ref WorkItem[MaxRefill] refill ) shared {
131+ size_t waitForWork (ref Worker worker, WorkItem[] items ) shared {
104132 mutex.lock();
105133 scope (exit) mutex.unlock();
106134
107- activeThreads-- ;
108-
109- /**
110- * We wait for work to be present in the worklist.
111- * If there is, then we pick it up and start marking.
112- *
113- * Alternatively, if there is no work to do, and the number
114- * of active thread is 0, then we know no more work is coming
115- * and we should stop.
116- */
117- static hasWork (Scanner* w) {
118- return w.cursor != 0 || w.activeThreads == 0 ;
119- }
120-
121- auto w = (cast (Scanner* ) &this );
122- mutex.waitFor(w.hasWork);
123-
124- if (w.cursor == 0 ) {
125- return 0 ;
126- }
127-
128- activeThreads++ ;
129-
130- uint count = 1 ;
131- uint top = w.cursor;
135+ activeThreads.fetchSub(1 );
132136
133- refill[0 ] = w.worklist[top - count];
134- auto length = refill[0 ].length;
137+ static struct Waiter {
138+ Worker* worker;
139+ size_t count;
135140
136- foreach (i; 1 .. min(top, MaxRefill)) {
137- auto next = w.worklist[top - count - 1 ];
141+ WorkItem[] items;
138142
139- auto nl = length + next.length;
140- if (nl > WorkItem.WorkUnit / 2 ) {
141- break ;
143+ this (Worker * worker, WorkItem[] items) {
144+ this .worker = worker;
145+ this .items = items ;
142146 }
143147
144- count++ ;
145- length = nl;
146- refill[i] = next;
147- }
148-
149- w.cursor = top - count;
150- return count;
151- }
152-
153- void ensureWorklistCapacity (size_t count) {
154- assert (mutex.isHeld(), " mutex not held!" );
155- assert (count < uint .max, " Cannot reserve this much capacity!" );
148+ bool hasWork () {
149+ // There is no more work to be done.
150+ if (worker.scanner.activeThreads.load() == 0 ) {
151+ return true ;
152+ }
156153
157- if (likely(count <= worklist.length)) {
158- return ;
154+ count = worker.popFromWorkQueue(items);
155+ return count > 0 ;
156+ }
159157 }
160158
161- enum MinWorklistSize = 4 * PageSize;
159+ /**
160+ * The fact all the waiters use a different delegate
161+ * as condition (because the waiter itself is different)
162+ * will cause a loop where they wake up each others as they
163+ * fail their condition.
164+ *
165+ * This is somewhat wasteful, but will do for now.
166+ * FIXME: Actually put multiple thread to sleep if
167+ * multiple threads are starved.
168+ */
169+ auto waiter = Waiter(&worker, items);
170+ mutex.waitFor(waiter.hasWork);
162171
163- auto size = count * WorkItem.sizeof;
164- if (size < MinWorklistSize) {
165- size = MinWorklistSize;
166- } else {
167- import d.gc.sizeclass;
168- size = getAllocSize(size);
172+ if (waiter.count > 0 ) {
173+ activeThreads.fetchAdd(1 );
169174 }
170175
171- import d.gc.tcache;
172- auto ptr = threadCache.realloc(worklist.ptr, size, false );
173- worklist = (cast (WorkItem* ) ptr)[0 .. size / WorkItem.sizeof];
174- }
175-
176- void addToWorkListImpl (WorkItem[] items) {
177- assert (mutex.isHeld(), " mutex not held!" );
178- assert (0 < items.length && items.length < uint .max,
179- " Invalid item count!" );
180-
181- auto capacity = cursor + items.length;
182- ensureWorklistCapacity(capacity );
183-
184- foreach (item; items) {
185- worklist[cursor++ ] = item;
186- }
176+ return waiter.count;
187177 }
188178}
189179
@@ -218,6 +208,8 @@ private:
218208
219209 LastDenseSlabCache ldsCache;
220210
211+ Overflow! WorkItem overflow;
212+
221213public :
222214 this (shared (Scanner)* scanner) {
223215 this .scanner = scanner;
@@ -229,7 +221,20 @@ public:
229221 this .gcCycle = scanner.gcCycle;
230222 }
231223
224+ size_t waitForWork (WorkItem[] items) {
225+ auto count = popFromWorkQueue(items);
226+ if (count > 0 ) {
227+ return count;
228+ }
229+
230+ // We are starving, defer to the scanner.
231+ return scanner.waitForWork(this , items);
232+ }
233+
232234 void runMark () {
235+ // Make sure we cleanup after ourselves when done.
236+ scope (exit) overflow.clear();
237+
233238 /**
234239 * Scan the stack and TLS.
235240 *
@@ -239,17 +244,18 @@ public:
239244 * restart new ones quickly and cheaply.
240245 *
241246 * Because we start and stop threads during the mark phase, we are
242- * at risk of missing pointers allocated for thread management resources
243- * and corrupting the internal of the standard C library.
247+ * at risk of missing pointers allocated for thread management
248+ * resources and corrupting the internal of the standard C library.
244249 *
245250 * This is NOT good! So we scan here to make sure we don't miss anything.
246251 */
247252 import d.gc.thread;
248253 threadScan(scan);
249254
250- WorkItem[Scanner.MaxRefill] refill;
255+ enum MaxRefill = 4 ;
256+ WorkItem[MaxRefill] refill;
251257 while (true ) {
252- auto count = scanner. waitForWork(refill);
258+ auto count = waitForWork(refill[ 0 .. MaxRefill] );
253259 if (count == 0 ) {
254260 // We are done, there is no more work items.
255261 return ;
@@ -277,19 +283,19 @@ public:
277283 } else {
278284 // Either a TLS range, or the stack of a suspended
279285 // thread. Can be scanned at any time.
280- addToWorkList (range);
286+ addToWorkQueue (range);
281287 }
282288 }
283289
284- void addToWorkList (WorkItem[] items) {
285- scanner.addToWorkList (items);
290+ void addToWorkQueue (WorkItem[] items) {
291+ scanner.addToWorkQueue (items, overflow );
286292 }
287293
288- void addToWorkList (WorkItem item) {
289- addToWorkList ((&item)[0 .. 1 ]);
294+ void addToWorkQueue (WorkItem item) {
295+ addToWorkQueue ((&item)[0 .. 1 ]);
290296 }
291297
292- void addToWorkList (const (void * )[] range) {
298+ void addToWorkQueue (const (void * )[] range) {
293299 // In order to expose some parallelism, we split the range
294300 // into smaller chunks to be distributed.
295301 while (range.length > 0 ) {
@@ -305,10 +311,14 @@ public:
305311 u = WorkItem.extractFromRange(range);
306312 }
307313
308- addToWorkList (units[0 .. count]);
314+ addToWorkQueue (units[0 .. count]);
309315 }
310316 }
311317
318+ size_t popFromWorkQueue (WorkItem[] items) {
319+ return scanner.popFromWorkQueue(items, overflow);
320+ }
321+
312322 void scan (const (void * )[] range) {
313323 while (range.length > 0 ) {
314324 scan(WorkItem.extractFromRange(range));
@@ -332,7 +342,7 @@ public:
332342 }
333343 }
334344
335- // Depth first doesn't really need a worklist ,
345+ // Depth first doesn't really need a work list ,
336346 // but this makes sharing code easier.
337347 enum WorkListCapacity = DepthFirst ? 1 : 16 ;
338348
@@ -383,7 +393,7 @@ public:
383393 continue ;
384394 }
385395
386- addToWorkList (worklist[0 .. WorkListCapacity]);
396+ addToWorkQueue (worklist[0 .. WorkListCapacity]);
387397
388398 cursor = 1 ;
389399 worklist[0 ] = i;
@@ -429,7 +439,7 @@ public:
429439 worklist[cursor++ ] = WorkItem.extractFromRange(range);
430440 }
431441
432- addToWorkList (range);
442+ addToWorkQueue (range);
433443 continue ;
434444 }
435445
@@ -448,7 +458,7 @@ public:
448458 if (DepthFirst && cursor == 0 ) {
449459 worklist[cursor++ ] = i;
450460 } else {
451- addToWorkList (i);
461+ addToWorkQueue (i);
452462 }
453463 }
454464
0 commit comments