1 /**
2 `std.parallelism` implements high-level primitives for SMP parallelism.
3 These include parallel foreach, parallel reduce, parallel eager map, pipelining
4 and future/promise parallelism. `std.parallelism` is recommended when the
5 same operation is to be executed in parallel on different data, or when a
6 function is to be executed in a background thread and its result returned to a
7 well-defined main thread. For communication between arbitrary threads, see
8 `std.concurrency`.
9 10 `std.parallelism` is based on the concept of a `Task`. A `Task` is an
11 object that represents the fundamental unit of work in this library and may be
12 executed in parallel with any other `Task`. Using `Task`
13 directly allows programming with a future/promise paradigm. All other
14 supported parallelism paradigms (parallel foreach, map, reduce, pipelining)
15 represent an additional level of abstraction over `Task`. They
16 automatically create one or more `Task` objects, or closely related types
17 that are conceptually identical but not part of the public API.
18 19 After creation, a `Task` may be executed in a new thread, or submitted
20 to a `TaskPool` for execution. A `TaskPool` encapsulates a task queue
21 and its worker threads. Its purpose is to efficiently map a large
22 number of `Task`s onto a smaller number of threads. A task queue is a
23 FIFO queue of `Task` objects that have been submitted to the
24 `TaskPool` and are awaiting execution. A worker thread is a thread that
25 is associated with exactly one task queue. It executes the `Task` at the
26 front of its queue when the queue has work available, or sleeps when
27 no work is available. Each task queue is associated with zero or
28 more worker threads. If the result of a `Task` is needed before execution
29 by a worker thread has begun, the `Task` can be removed from the task queue
30 and executed immediately in the thread where the result is needed.
31 32 Warning: Unless marked as `@trusted` or `@safe`, artifacts in
33 this module allow implicit data sharing between threads and cannot
34 guarantee that client code is free from low level data races.
35 36 Source: $(PHOBOSSRC std/parallelism.d)
37 Author: David Simcha
38 Copyright: Copyright (c) 2009-2011, David Simcha.
39 License: $(HTTP boost.org/LICENSE_1_0.txt, Boost License 1.0)
40 */41 modulestd.parallelism;
42 43 version (OSX)
44 version = Darwin;
45 elseversion (iOS)
46 version = Darwin;
47 elseversion (TVOS)
48 version = Darwin;
49 elseversion (WatchOS)
50 version = Darwin;
51 52 ///53 @systemunittest54 {
55 importstd.algorithm.iteration : map;
56 importstd.math.operations : isClose;
57 importstd.parallelism : taskPool;
58 importstd.range : iota;
59 60 // Parallel reduce can be combined with61 // std.algorithm.iteration.map to interesting effect.62 // The following example (thanks to Russel Winder)63 // calculates pi by quadrature using64 // std.algorithm.map and TaskPool.reduce.65 // getTerm is evaluated in parallel as needed by66 // TaskPool.reduce.67 //68 // Timings on an Intel i5-3450 quad core machine69 // for n = 1_000_000_000:70 //71 // TaskPool.reduce: 1.067 s72 // std.algorithm.reduce: 4.011 s73 74 enumn = 1_000_000;
75 enumdelta = 1.0 / n;
76 77 aliasgetTerm = (inti)
78 {
79 immutablex = ( i - 0.5 ) * delta;
80 returndelta / ( 1.0 + x * x ) ;
81 };
82 83 immutablepi = 4.0 * taskPool.reduce!"a + b"(n.iota.map!getTerm);
84 85 assert(pi.isClose(3.14159, 1e-5));
86 }
87 88 importcore.atomic;
89 importcore.memory;
90 importcore.sync.condition;
91 importcore.thread;
92 93 importstd.functional;
94 importstd.meta;
95 importstd.range.primitives;
96 importstd.traits;
97 98 /*
99 (For now public undocumented with reserved name.)
100 101 A lazily initialized global constant. The underlying value is a shared global
102 statically initialized to `outOfBandValue` which must not be a legit value of
103 the constant. Upon the first call the situation is detected and the global is
104 initialized by calling `initializer`. The initializer is assumed to be pure
105 (even if not marked as such), i.e. return the same value upon repeated calls.
106 For that reason, no special precautions are taken so `initializer` may be called
107 more than one time leading to benign races on the cached value.
108 109 In the quiescent state the cost of the function is an atomic load from a global.
110 111 Params:
112 T = The type of the pseudo-constant (may be qualified)
113 outOfBandValue = A value that cannot be valid, it is used for initialization
114 initializer = The function performing initialization; must be `nothrow`
115 116 Returns:
117 The lazily initialized value
118 */119 @propertypure120 T__lazilyInitializedConstant(T, aliasoutOfBandValue, aliasinitializer)()
121 if (is(Unqual!T : T)
122 && is(typeof(initializer()) : T)
123 && is(typeof(outOfBandValue) : T))
124 {
125 staticTimpl() nothrow126 {
127 // Thread-local cache128 staticUnqual!Ttls = outOfBandValue;
129 autolocal = tls;
130 // Shortest path, no atomic operations131 if (local != outOfBandValue) returnlocal;
132 // Process-level cache133 staticsharedUnqual!Tresult = outOfBandValue;
134 // Initialize both process-level cache and tls135 local = atomicLoad(result);
136 if (local == outOfBandValue)
137 {
138 local = initializer();
139 atomicStore(result, local);
140 }
141 tls = local;
142 returnlocal;
143 }
144 145 importstd.traits : SetFunctionAttributes;
146 aliasFun = SetFunctionAttributes!(typeof(&impl), "D",
147 functionAttributes!(typeof(&impl)) | FunctionAttribute.pure_);
148 autopurified = (() @trusted => cast(Fun) &impl)();
149 returnpurified();
150 }
151 152 // Returns the size of a cache line.153 aliascacheLineSize =
154 __lazilyInitializedConstant!(immutable(size_t), size_t.max, cacheLineSizeImpl);
155 156 privatesize_tcacheLineSizeImpl() @nogcnothrow @trusted157 {
158 size_tresult = 0;
159 importcore.cpuid : datacache;
160 foreach (refconstcachelevel; datacache)
161 {
162 if (cachelevel.lineSize > result && cachelevel.lineSize < uint.max)
163 {
164 result = cachelevel.lineSize;
165 }
166 }
167 returnresult;
168 }
169 170 @nogc @safenothrowunittest171 {
172 assert(cacheLineSize == cacheLineSizeImpl);
173 }
174 175 /* Atomics code. These forward to core.atomic, but are written like this
176 for two reasons:
177 178 1. They used to actually contain ASM code and I don' want to have to change
179 to directly calling core.atomic in a zillion different places.
180 181 2. core.atomic has some misc. issues that make my use cases difficult
182 without wrapping it. If I didn't wrap it, casts would be required
183 basically everywhere.
184 */185 privatevoidatomicSetUbyte(T)(refTstuff, TnewVal)
186 if (__traits(isIntegral, T) && is(T : ubyte))
187 {
188 //core.atomic.cas(cast(shared) &stuff, stuff, newVal);189 atomicStore(*(cast(shared) &stuff), newVal);
190 }
191 192 privateubyteatomicReadUbyte(T)(refTval)
193 if (__traits(isIntegral, T) && is(T : ubyte))
194 {
195 returnatomicLoad(*(cast(shared) &val));
196 }
197 198 // This gets rid of the need for a lot of annoying casts in other parts of the199 // code, when enums are involved.200 privateboolatomicCasUbyte(T)(refTstuff, TtestVal, TnewVal)
201 if (__traits(isIntegral, T) && is(T : ubyte))
202 {
203 returncore.atomic.cas(cast(shared) &stuff, testVal, newVal);
204 }
205 206 /*--------------------- Generic helper functions, etc.------------------------*/207 privatetemplateMapType(R, functions...)
208 {
209 staticassert(functions.length);
210 211 ElementType!Re = void;
212 aliasMapType =
213 typeof(adjoin!(staticMap!(unaryFun, functions))(e));
214 }
215 216 privatetemplateReduceType(aliasfun, R, E)
217 {
218 aliasReduceType = typeof(binaryFun!fun(E.init, ElementType!R.init));
219 }
220 221 privatetemplatenoUnsharedAliasing(T)
222 {
223 enumboolnoUnsharedAliasing = !hasUnsharedAliasing!T;
224 }
225 226 // This template tests whether a function may be executed in parallel from227 // @safe code via Task.executeInNewThread(). There is an additional228 // requirement for executing it via a TaskPool. (See isSafeReturn).229 privatetemplateisSafeTask(F)
230 {
231 enumboolisSafeTask =
232 (functionAttributes!F & (FunctionAttribute.safe | FunctionAttribute.trusted)) != 0 &&
233 (functionAttributes!F & FunctionAttribute.ref_) == 0 &&
234 (isFunctionPointer!F || !hasUnsharedAliasing!F) &&
235 allSatisfy!(noUnsharedAliasing, Parameters!F);
236 }
237 238 @safeunittest239 {
240 aliasF1 = voidfunction() @safe;
241 aliasF2 = voidfunction();
242 aliasF3 = voidfunction(uint, string) @trusted;
243 aliasF4 = voidfunction(uint, char[]);
244 245 staticassert( isSafeTask!F1);
246 staticassert(!isSafeTask!F2);
247 staticassert( isSafeTask!F3);
248 staticassert(!isSafeTask!F4);
249 250 aliasF5 = uint[] function(uint, string) pure @trusted;
251 staticassert( isSafeTask!F5);
252 }
253 254 // This function decides whether Tasks that meet all of the other requirements255 // for being executed from @safe code can be executed on a TaskPool.256 // When executing via TaskPool, it's theoretically possible257 // to return a value that is also pointed to by a worker thread's thread local258 // storage. When executing from executeInNewThread(), the thread that executed259 // the Task is terminated by the time the return value is visible in the calling260 // thread, so this is a non-issue. It's also a non-issue for pure functions261 // since they can't read global state.262 privatetemplateisSafeReturn(T)
263 {
264 staticif (!hasUnsharedAliasing!(T.ReturnType))
265 {
266 enumisSafeReturn = true;
267 }
268 elsestaticif (T.isPure)
269 {
270 enumisSafeReturn = true;
271 }
272 else273 {
274 enumisSafeReturn = false;
275 }
276 }
277 278 privatetemplaterandAssignable(R)
279 {
280 enumrandAssignable = isRandomAccessRange!R && hasAssignableElements!R;
281 }
282 283 privateenumTaskStatus : ubyte284 {
285 notStarted,
286 inProgress,
287 done288 }
289 290 privatetemplateAliasReturn(aliasfun, T...)
291 {
292 aliasAliasReturn = typeof({ Targs; returnfun(args); });
293 }
294 295 // Should be private, but std.algorithm.reduce is used in the zero-thread case296 // and won't work w/ private.297 templatereduceAdjoin(functions...)
298 {
299 staticif (functions.length == 1)
300 {
301 aliasreduceAdjoin = binaryFun!(functions[0]);
302 }
303 else304 {
305 TreduceAdjoin(T, U)(Tlhs, Urhs)
306 {
307 aliasfuns = staticMap!(binaryFun, functions);
308 309 foreach (i, Unused; typeof(lhs.expand))
310 {
311 lhs.expand[i] = funs[i](lhs.expand[i], rhs);
312 }
313 314 returnlhs;
315 }
316 }
317 }
318 319 privatetemplatereduceFinish(functions...)
320 {
321 staticif (functions.length == 1)
322 {
323 aliasreduceFinish = binaryFun!(functions[0]);
324 }
325 else326 {
327 TreduceFinish(T)(Tlhs, Trhs)
328 {
329 aliasfuns = staticMap!(binaryFun, functions);
330 331 foreach (i, Unused; typeof(lhs.expand))
332 {
333 lhs.expand[i] = funs[i](lhs.expand[i], rhs.expand[i]);
334 }
335 336 returnlhs;
337 }
338 }
339 }
340 341 privatetemplateisRoundRobin(R : RoundRobinBuffer!(C1, C2), C1, C2)
342 {
343 enumisRoundRobin = true;
344 }
345 346 privatetemplateisRoundRobin(T)
347 {
348 enumisRoundRobin = false;
349 }
350 351 @safeunittest352 {
353 staticassert( isRoundRobin!(RoundRobinBuffer!(voiddelegate(char[]), booldelegate())));
354 staticassert(!isRoundRobin!(uint));
355 }
356 357 // This is the base "class" for all of the other tasks. Using C-style358 // polymorphism to allow more direct control over memory allocation, etc.359 privatestructAbstractTask360 {
361 AbstractTask* prev;
362 AbstractTask* next;
363 364 // Pointer to a function that executes this task.365 voidfunction(void*) runTask;
366 367 Throwableexception;
368 ubytetaskStatus = TaskStatus.notStarted;
369 370 booldone() @property371 {
372 if (atomicReadUbyte(taskStatus) == TaskStatus.done)
373 {
374 if (exception)
375 {
376 throwexception;
377 }
378 379 returntrue;
380 }
381 382 returnfalse;
383 }
384 385 voidjob()
386 {
387 runTask(&this);
388 }
389 }
390 391 /**
392 `Task` represents the fundamental unit of work. A `Task` may be
393 executed in parallel with any other `Task`. Using this struct directly
394 allows future/promise parallelism. In this paradigm, a function (or delegate
395 or other callable) is executed in a thread other than the one it was called
396 from. The calling thread does not block while the function is being executed.
397 A call to `workForce`, `yieldForce`, or `spinForce` is used to
398 ensure that the `Task` has finished executing and to obtain the return
399 value, if any. These functions and `done` also act as full memory barriers,
400 meaning that any memory writes made in the thread that executed the `Task`
401 are guaranteed to be visible in the calling thread after one of these functions
402 returns.
403 404 The $(REF task, std,parallelism) and $(REF scopedTask, std,parallelism) functions can
405 be used to create an instance of this struct. See `task` for usage examples.
406 407 Function results are returned from `yieldForce`, `spinForce` and
408 `workForce` by ref. If `fun` returns by ref, the reference will point
409 to the returned reference of `fun`. Otherwise it will point to a
410 field in this struct.
411 412 Copying of this struct is disabled, since it would provide no useful semantics.
413 If you want to pass this struct around, you should do so by reference or
414 pointer.
415 416 Bugs: Changes to `ref` and `out` arguments are not propagated to the
417 call site, only to `args` in this struct.
418 */419 structTask(aliasfun, Args...)
420 {
421 privateAbstractTaskbase = {runTask : &impl};
422 privatealiasbasethis;
423 424 private @propertyAbstractTask* basePtr()
425 {
426 return &base;
427 }
428 429 privatestaticvoidimpl(void* myTask)
430 {
431 importstd.algorithm.internal : addressOf;
432 433 Task* myCastedTask = cast(typeof(this)*) myTask;
434 staticif (is(ReturnType == void))
435 {
436 fun(myCastedTask._args);
437 }
438 elsestaticif (is(typeof(&(fun(myCastedTask._args)))))
439 {
440 myCastedTask.returnVal = addressOf(fun(myCastedTask._args));
441 }
442 else443 {
444 myCastedTask.returnVal = fun(myCastedTask._args);
445 }
446 }
447 448 privateTaskPoolpool;
449 privateboolisScoped; // True if created with scopedTask.450 451 Args_args;
452 453 /**
454 The arguments the function was called with. Changes to `out` and
455 `ref` arguments will be visible here.
456 */457 staticif (__traits(isSame, fun, run))
458 {
459 aliasargs = _args[1..$];
460 }
461 else462 {
463 aliasargs = _args;
464 }
465 466 467 // The purpose of this code is to decide whether functions whose468 // return values have unshared aliasing can be executed via469 // TaskPool from @safe code. See isSafeReturn.470 staticif (__traits(isSame, fun, run))
471 {
472 staticif (isFunctionPointer!(_args[0]))
473 {
474 privateenumboolisPure =
475 (functionAttributes!(Args[0]) & FunctionAttribute.pure_) != 0;
476 }
477 else478 {
479 // BUG: Should check this for delegates too, but std.traits480 // apparently doesn't allow this. isPure is irrelevant481 // for delegates, at least for now since shared delegates482 // don't work.483 privateenumboolisPure = false;
484 }
485 486 }
487 else488 {
489 // We already know that we can't execute aliases in @safe code, so490 // just put a dummy value here.491 privateenumboolisPure = false;
492 }
493 494 495 /**
496 The return type of the function called by this `Task`. This can be
497 `void`.
498 */499 aliasReturnType = typeof(fun(_args));
500 501 staticif (!is(ReturnType == void))
502 {
503 staticif (is(typeof(&fun(_args))))
504 {
505 // Ref return.506 ReturnType* returnVal;
507 508 refReturnTypefixRef(ReturnType* val)
509 {
510 return *val;
511 }
512 513 }
514 else515 {
516 ReturnTypereturnVal;
517 518 refReturnTypefixRef(refReturnTypeval)
519 {
520 returnval;
521 }
522 }
523 }
524 525 privatevoidenforcePool()
526 {
527 importstd.exception : enforce;
528 enforce(this.pool !isnull, "Job not submitted yet.");
529 }
530 531 staticif (Args.length > 0)
532 {
533 privatethis(Argsargs)
534 {
535 _args = args;
536 }
537 }
538 539 // Work around DMD bug https://issues.dlang.org/show_bug.cgi?id=6588,540 // allow immutable elements.541 staticif (allSatisfy!(isAssignable, Args))
542 {
543 typeof(this) opAssign(typeof(this) rhs)
544 {
545 foreach (i, Type; typeof(this.tupleof))
546 {
547 this.tupleof[i] = rhs.tupleof[i];
548 }
549 returnthis;
550 }
551 }
552 else553 {
554 @disabletypeof(this) opAssign(typeof(this) rhs);
555 }
556 557 /**
558 If the `Task` isn't started yet, execute it in the current thread.
559 If it's done, return its return value, if any. If it's in progress,
560 busy spin until it's done, then return the return value. If it threw
561 an exception, rethrow that exception.
562 563 This function should be used when you expect the result of the
564 `Task` to be available on a timescale shorter than that of an OS
565 context switch.
566 */567 @propertyrefReturnTypespinForce() @trusted568 {
569 enforcePool();
570 571 this.pool.tryDeleteExecute(basePtr);
572 573 while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) {}
574 575 if (exception)
576 {
577 throwexception;
578 }
579 580 staticif (!is(ReturnType == void))
581 {
582 returnfixRef(this.returnVal);
583 }
584 }
585 586 /**
587 If the `Task` isn't started yet, execute it in the current thread.
588 If it's done, return its return value, if any. If it's in progress,
589 wait on a condition variable. If it threw an exception, rethrow that
590 exception.
591 592 This function should be used for expensive functions, as waiting on a
593 condition variable introduces latency, but avoids wasted CPU cycles.
594 */595 @propertyrefReturnTypeyieldForce() @trusted596 {
597 enforcePool();
598 this.pool.tryDeleteExecute(basePtr);
599 600 if (done)
601 {
602 staticif (is(ReturnType == void))
603 {
604 return;
605 }
606 else607 {
608 returnfixRef(this.returnVal);
609 }
610 }
611 612 pool.waiterLock();
613 scope(exit) pool.waiterUnlock();
614 615 while (atomicReadUbyte(this.taskStatus) != TaskStatus.done)
616 {
617 pool.waitUntilCompletion();
618 }
619 620 if (exception)
621 {
622 throwexception; // nocoverage623 }
624 625 staticif (!is(ReturnType == void))
626 {
627 returnfixRef(this.returnVal);
628 }
629 }
630 631 /**
632 If this `Task` was not started yet, execute it in the current
633 thread. If it is finished, return its result. If it is in progress,
634 execute any other `Task` from the `TaskPool` instance that
635 this `Task` was submitted to until this one
636 is finished. If it threw an exception, rethrow that exception.
637 If no other tasks are available or this `Task` was executed using
638 `executeInNewThread`, wait on a condition variable.
639 */640 @propertyrefReturnTypeworkForce() @trusted641 {
642 enforcePool();
643 this.pool.tryDeleteExecute(basePtr);
644 645 while (true)
646 {
647 if (done) // done() implicitly checks for exceptions.648 {
649 staticif (is(ReturnType == void))
650 {
651 return;
652 }
653 else654 {
655 returnfixRef(this.returnVal);
656 }
657 }
658 659 AbstractTask* job;
660 {
661 // Locking explicitly and calling popNoSync() because662 // pop() waits on a condition variable if there are no Tasks663 // in the queue.664 665 pool.queueLock();
666 scope(exit) pool.queueUnlock();
667 job = pool.popNoSync();
668 }
669 670 671 if (job !isnull)
672 {
673 674 version (verboseUnittest)
675 {
676 stderr.writeln("Doing workForce work.");
677 }
678 679 pool.doJob(job);
680 681 if (done)
682 {
683 staticif (is(ReturnType == void))
684 {
685 return;
686 }
687 else688 {
689 returnfixRef(this.returnVal);
690 }
691 }
692 }
693 else694 {
695 version (verboseUnittest)
696 {
697 stderr.writeln("Yield from workForce.");
698 }
699 700 returnyieldForce;
701 }
702 }
703 }
704 705 /**
706 Returns `true` if the `Task` is finished executing.
707 708 Throws: Rethrows any exception thrown during the execution of the
709 `Task`.
710 */711 @propertybooldone() @trusted712 {
713 // Explicitly forwarded for documentation purposes.714 returnbase.done;
715 }
716 717 /**
718 Create a new thread for executing this `Task`, execute it in the
719 newly created thread, then terminate the thread. This can be used for
720 future/promise parallelism. An explicit priority may be given
721 to the `Task`. If one is provided, its value is forwarded to
722 `core.thread.Thread.priority`. See $(REF task, std,parallelism) for
723 usage example.
724 */725 voidexecuteInNewThread() @trusted726 {
727 pool = newTaskPool(basePtr);
728 }
729 730 /// Ditto731 voidexecuteInNewThread(intpriority) @trusted732 {
733 pool = newTaskPool(basePtr, priority);
734 }
735 736 @safe ~this()
737 {
738 if (isScoped && pool !isnull && taskStatus != TaskStatus.done)
739 {
740 yieldForce;
741 }
742 }
743 744 // When this is uncommented, it somehow gets called on returning from745 // scopedTask even though the struct shouldn't be getting copied.746 //@disable this(this) {}747 }
748 749 // Calls `fpOrDelegate` with `args`. This is an750 // adapter that makes `Task` work with delegates, function pointers and751 // functors instead of just aliases.752 ReturnType!Frun(F, Args...)(FfpOrDelegate, refArgsargs)
753 {
754 returnfpOrDelegate(args);
755 }
756 757 /**
758 Creates a `Task` on the GC heap that calls an alias. This may be executed
759 via `Task.executeInNewThread` or by submitting to a
760 $(REF TaskPool, std,parallelism). A globally accessible instance of
761 `TaskPool` is provided by $(REF taskPool, std,parallelism).
762 763 Returns: A pointer to the `Task`.
764 765 Example:
766 ---
767 // Read two files into memory at the same time.
768 import std.file;
769 770 void main()
771 {
772 // Create and execute a Task for reading
773 // foo.txt.
774 auto file1Task = task!read("foo.txt");
775 file1Task.executeInNewThread();
776 777 // Read bar.txt in parallel.
778 auto file2Data = read("bar.txt");
779 780 // Get the results of reading foo.txt.
781 auto file1Data = file1Task.yieldForce;
782 }
783 ---
784 785 ---
786 // Sorts an array using a parallel quick sort algorithm.
787 // The first partition is done serially. Both recursion
788 // branches are then executed in parallel.
789 //
790 // Timings for sorting an array of 1,000,000 doubles on
791 // an Athlon 64 X2 dual core machine:
792 //
793 // This implementation: 176 milliseconds.
794 // Equivalent serial implementation: 280 milliseconds
795 void parallelSort(T)(T[] data)
796 {
797 // Sort small subarrays serially.
798 if (data.length < 100)
799 {
800 std.algorithm.sort(data);
801 return;
802 }
803 804 // Partition the array.
805 swap(data[$ / 2], data[$ - 1]);
806 auto pivot = data[$ - 1];
807 bool lessThanPivot(T elem) { return elem < pivot; }
808 809 auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]);
810 swap(data[$ - greaterEqual.length - 1], data[$ - 1]);
811 812 auto less = data[0..$ - greaterEqual.length - 1];
813 greaterEqual = data[$ - greaterEqual.length..$];
814 815 // Execute both recursion branches in parallel.
816 auto recurseTask = task!parallelSort(greaterEqual);
817 taskPool.put(recurseTask);
818 parallelSort(less);
819 recurseTask.yieldForce;
820 }
821 ---
822 */823 autotask(aliasfun, Args...)(Argsargs)
824 {
825 returnnewTask!(fun, Args)(args);
826 }
827 828 /**
829 Creates a `Task` on the GC heap that calls a function pointer, delegate, or
830 class/struct with overloaded opCall.
831 832 Example:
833 ---
834 // Read two files in at the same time again,
835 // but this time use a function pointer instead
836 // of an alias to represent std.file.read.
837 import std.file;
838 839 void main()
840 {
841 // Create and execute a Task for reading
842 // foo.txt.
843 auto file1Task = task(&read!string, "foo.txt", size_t.max);
844 file1Task.executeInNewThread();
845 846 // Read bar.txt in parallel.
847 auto file2Data = read("bar.txt");
848 849 // Get the results of reading foo.txt.
850 auto file1Data = file1Task.yieldForce;
851 }
852 ---
853 854 Notes: This function takes a non-scope delegate, meaning it can be
855 used with closures. If you can't allocate a closure due to objects
856 on the stack that have scoped destruction, see `scopedTask`, which
857 takes a scope delegate.
858 */859 autotask(F, Args...)(FdelegateOrFp, Argsargs)
860 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)
861 {
862 returnnewTask!(run, F, Args)(delegateOrFp, args);
863 }
864 865 /**
866 Version of `task` usable from `@safe` code. Usage mechanics are
867 identical to the non-@safe case, but safety introduces some restrictions:
868 869 1. `fun` must be @safe or @trusted.
870 871 2. `F` must not have any unshared aliasing as defined by
872 $(REF hasUnsharedAliasing, std,traits). This means it
873 may not be an unshared delegate or a non-shared class or struct
874 with overloaded `opCall`. This also precludes accepting template
875 alias parameters.
876 877 3. `Args` must not have unshared aliasing.
878 879 4. `fun` must not return by reference.
880 881 5. The return type must not have unshared aliasing unless `fun` is
882 `pure` or the `Task` is executed via `executeInNewThread` instead
883 of using a `TaskPool`.
884 885 */886 @trustedautotask(F, Args...)(Ffun, Argsargs)
887 if (__traits(compiles, () @safe => fun(args)) && isSafeTask!F)
888 {
889 returnnewTask!(run, F, Args)(fun, args);
890 }
891 892 @safeunittest893 {
894 staticstructOops {
895 intconvert() {
896 *cast(int*) 0xcafebabe = 0xdeadbeef;
897 return0;
898 }
899 aliasconvertthis;
900 }
901 staticvoidfoo(int) @safe {}
902 903 staticassert(!__traits(compiles, task(&foo, Oops.init)));
904 staticassert(!__traits(compiles, scopedTask(&foo, Oops.init)));
905 }
906 907 /**
908 These functions allow the creation of `Task` objects on the stack rather
909 than the GC heap. The lifetime of a `Task` created by `scopedTask`
910 cannot exceed the lifetime of the scope it was created in.
911 912 `scopedTask` might be preferred over `task`:
913 914 1. When a `Task` that calls a delegate is being created and a closure
915 cannot be allocated due to objects on the stack that have scoped
916 destruction. The delegate overload of `scopedTask` takes a `scope`
917 delegate.
918 919 2. As a micro-optimization, to avoid the heap allocation associated with
920 `task` or with the creation of a closure.
921 922 Usage is otherwise identical to `task`.
923 924 Notes: `Task` objects created using `scopedTask` will automatically
925 call `Task.yieldForce` in their destructor if necessary to ensure
926 the `Task` is complete before the stack frame they reside on is destroyed.
927 */928 autoscopedTask(aliasfun, Args...)(Argsargs)
929 {
930 autoret = Task!(fun, Args)(args);
931 ret.isScoped = true;
932 returnret;
933 }
934 935 /// Ditto936 autoscopedTask(F, Args...)(scopeFdelegateOrFp, Argsargs)
937 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)
938 {
939 autoret = Task!(run, F, Args)(delegateOrFp, args);
940 ret.isScoped = true;
941 returnret;
942 }
943 944 /// Ditto945 @trustedautoscopedTask(F, Args...)(Ffun, Argsargs)
946 if (__traits(compiles, () @safe => fun(args)) && isSafeTask!F)
947 {
948 autoret = Task!(run, F, Args)(fun, args);
949 ret.isScoped = true;
950 returnret;
951 }
952 953 /**
954 The total number of CPU cores available on the current machine, as reported by
955 the operating system.
956 */957 aliastotalCPUs =
958 __lazilyInitializedConstant!(immutable(uint), uint.max, totalCPUsImpl);
959 960 uinttotalCPUsImpl() @nogcnothrow @trusted961 {
962 version (Windows)
963 {
964 // BUGS: Only works on Windows 2000 and above.965 importcore.sys.windows.winbase : SYSTEM_INFO, GetSystemInfo;
966 importstd.algorithm.comparison : max;
967 SYSTEM_INFOsi;
968 GetSystemInfo(&si);
969 returnmax(1, cast(uint) si.dwNumberOfProcessors);
970 }
971 elseversion (linux)
972 {
973 importcore.stdc.stdlib : calloc;
974 importcore.stdc.string : memset;
975 importcore.sys.linux.sched : CPU_ALLOC_SIZE, CPU_FREE, CPU_COUNT, CPU_COUNT_S, cpu_set_t, sched_getaffinity;
976 importcore.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
977 978 intcount = 0;
979 980 /**
981 * According to ruby's source code, CPU_ALLOC() doesn't work as expected.
982 * see: https://github.com/ruby/ruby/commit/7d9e04de496915dd9e4544ee18b1a0026dc79242
983 *
984 * The hardcode number also comes from ruby's source code.
985 * see: https://github.com/ruby/ruby/commit/0fa75e813ecf0f5f3dd01f89aa76d0e25ab4fcd4
986 */987 for (intn = 64; n <= 16384; n *= 2)
988 {
989 size_tsize = CPU_ALLOC_SIZE(count);
990 if (size >= 0x400)
991 {
992 autocpuset = cast(cpu_set_t*) calloc(1, size);
993 if (cpusetisnull) break;
994 if (sched_getaffinity(0, size, cpuset) == 0)
995 {
996 count = CPU_COUNT_S(size, cpuset);
997 }
998 CPU_FREE(cpuset);
999 }
1000 else1001 {
1002 cpu_set_tcpuset;
1003 if (sched_getaffinity(0, cpu_set_t.sizeof, &cpuset) == 0)
1004 {
1005 count = CPU_COUNT(&cpuset);
1006 }
1007 }
1008 1009 if (count > 0)
1010 returncast(uint) count;
1011 }
1012 1013 returncast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1014 }
1015 elseversion (Darwin)
1016 {
1017 importcore.sys.darwin.sys.sysctl : sysctlbyname;
1018 uintresult;
1019 size_tlen = result.sizeof;
1020 sysctlbyname("hw.physicalcpu", &result, &len, null, 0);
1021 returnresult;
1022 }
1023 elseversion (DragonFlyBSD)
1024 {
1025 importcore.sys.dragonflybsd.sys.sysctl : sysctlbyname;
1026 uintresult;
1027 size_tlen = result.sizeof;
1028 sysctlbyname("hw.ncpu", &result, &len, null, 0);
1029 returnresult;
1030 }
1031 elseversion (FreeBSD)
1032 {
1033 importcore.sys.freebsd.sys.sysctl : sysctlbyname;
1034 uintresult;
1035 size_tlen = result.sizeof;
1036 sysctlbyname("hw.ncpu", &result, &len, null, 0);
1037 returnresult;
1038 }
1039 elseversion (NetBSD)
1040 {
1041 importcore.sys.netbsd.sys.sysctl : sysctlbyname;
1042 uintresult;
1043 size_tlen = result.sizeof;
1044 sysctlbyname("hw.ncpu", &result, &len, null, 0);
1045 returnresult;
1046 }
1047 elseversion (Solaris)
1048 {
1049 importcore.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
1050 returncast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1051 }
1052 elseversion (OpenBSD)
1053 {
1054 importcore.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
1055 returncast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1056 }
1057 elseversion (Hurd)
1058 {
1059 importcore.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
1060 returncast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1061 }
1062 else1063 {
1064 staticassert(0, "Don't know how to get N CPUs on this OS.");
1065 }
1066 }
1067 1068 /*
1069 This class serves two purposes:
1070 1071 1. It distinguishes std.parallelism threads from other threads so that
1072 the std.parallelism daemon threads can be terminated.
1073 1074 2. It adds a reference to the pool that the thread is a member of,
1075 which is also necessary to allow the daemon threads to be properly
1076 terminated.
1077 */1078 privatefinalclassParallelismThread : Thread1079 {
1080 this(voiddelegate() dg)
1081 {
1082 super(dg);
1083 }
1084 1085 TaskPoolpool;
1086 }
1087 1088 // Kill daemon threads.1089 sharedstatic ~this()
1090 {
1091 foreach (refthread; Thread)
1092 {
1093 autopthread = cast(ParallelismThread) thread;
1094 if (pthreadisnull) continue;
1095 autopool = pthread.pool;
1096 if (!pool.isDaemon) continue;
1097 pool.stop();
1098 pthread.join();
1099 }
1100 }
1101 1102 /**
1103 This class encapsulates a task queue and a set of worker threads. Its purpose
1104 is to efficiently map a large number of `Task`s onto a smaller number of
1105 threads. A task queue is a FIFO queue of `Task` objects that have been
1106 submitted to the `TaskPool` and are awaiting execution. A worker thread is a
1107 thread that executes the `Task` at the front of the queue when one is
1108 available and sleeps when the queue is empty.
1109 1110 This class should usually be used via the global instantiation
1111 available via the $(REF taskPool, std,parallelism) property.
1112 Occasionally it is useful to explicitly instantiate a `TaskPool`:
1113 1114 1. When you want `TaskPool` instances with multiple priorities, for example
1115 a low priority pool and a high priority pool.
1116 1117 2. When the threads in the global task pool are waiting on a synchronization
1118 primitive (for example a mutex), and you want to parallelize the code that
1119 needs to run before these threads can be resumed.
1120 1121 Note: The worker threads in this pool will not stop until
1122 `stop` or `finish` is called, even if the main thread
1123 has finished already. This may lead to programs that
1124 never end. If you do not want this behaviour, you can set `isDaemon`
1125 to true.
1126 */1127 finalclassTaskPool1128 {
1129 private:
1130 1131 // A pool can either be a regular pool or a single-task pool. A1132 // single-task pool is a dummy pool that's fired up for1133 // Task.executeInNewThread().1134 boolisSingleTask;
1135 1136 ParallelismThread[] pool;
1137 ThreadsingleTaskThread;
1138 1139 AbstractTask* head;
1140 AbstractTask* tail;
1141 PoolStatestatus = PoolState.running;
1142 ConditionworkerCondition;
1143 ConditionwaiterCondition;
1144 MutexqueueMutex;
1145 MutexwaiterMutex; // For waiterCondition1146 1147 // The instanceStartIndex of the next instance that will be created.1148 __gsharedsize_tnextInstanceIndex = 1;
1149 1150 // The index of the current thread.1151 staticsize_tthreadIndex;
1152 1153 // The index of the first thread in this instance.1154 immutablesize_tinstanceStartIndex;
1155 1156 // The index that the next thread to be initialized in this pool will have.1157 size_tnextThreadIndex;
1158 1159 enumPoolState : ubyte1160 {
1161 running,
1162 finishing,
1163 stopNow1164 }
1165 1166 voiddoJob(AbstractTask* job)
1167 {
1168 assert(job.taskStatus == TaskStatus.inProgress);
1169 assert(job.nextisnull);
1170 assert(job.previsnull);
1171 1172 scope(exit)
1173 {
1174 if (!isSingleTask)
1175 {
1176 waiterLock();
1177 scope(exit) waiterUnlock();
1178 notifyWaiters();
1179 }
1180 }
1181 1182 try1183 {
1184 job.job();
1185 }
1186 catch (Throwablee)
1187 {
1188 job.exception = e;
1189 }
1190 1191 atomicSetUbyte(job.taskStatus, TaskStatus.done);
1192 }
1193 1194 // This function is used for dummy pools created by Task.executeInNewThread().1195 voiddoSingleTask()
1196 {
1197 // No synchronization. Pool is guaranteed to only have one thread,1198 // and the queue is submitted to before this thread is created.1199 assert(head);
1200 autot = head;
1201 t.next = t.prev = head = null;
1202 doJob(t);
1203 }
1204 1205 // This function performs initialization for each thread that affects1206 // thread local storage and therefore must be done from within the1207 // worker thread. It then calls executeWorkLoop().1208 voidstartWorkLoop()
1209 {
1210 // Initialize thread index.1211 {
1212 queueLock();
1213 scope(exit) queueUnlock();
1214 threadIndex = nextThreadIndex;
1215 nextThreadIndex++;
1216 }
1217 1218 executeWorkLoop();
1219 }
1220 1221 // This is the main work loop that worker threads spend their time in1222 // until they terminate. It's also entered by non-worker threads when1223 // finish() is called with the blocking variable set to true.1224 voidexecuteWorkLoop()
1225 {
1226 while (atomicReadUbyte(status) != PoolState.stopNow)
1227 {
1228 AbstractTask* task = pop();
1229 if (taskisnull)
1230 {
1231 if (atomicReadUbyte(status) == PoolState.finishing)
1232 {
1233 atomicSetUbyte(status, PoolState.stopNow);
1234 return;
1235 }
1236 }
1237 else1238 {
1239 doJob(task);
1240 }
1241 }
1242 }
1243 1244 // Pop a task off the queue.1245 AbstractTask* pop()
1246 {
1247 queueLock();
1248 scope(exit) queueUnlock();
1249 autoret = popNoSync();
1250 while (retisnull && status == PoolState.running)
1251 {
1252 wait();
1253 ret = popNoSync();
1254 }
1255 returnret;
1256 }
1257 1258 AbstractTask* popNoSync()
1259 out(returned)
1260 {
1261 /* If task.prev and task.next aren't null, then another thread
1262 * can try to delete this task from the pool after it's
1263 * alreadly been deleted/popped.
1264 */1265 if (returned !isnull)
1266 {
1267 assert(returned.nextisnull);
1268 assert(returned.previsnull);
1269 }
1270 }
1271 do1272 {
1273 if (isSingleTask) returnnull;
1274 1275 AbstractTask* returned = head;
1276 if (head !isnull)
1277 {
1278 head = head.next;
1279 returned.prev = null;
1280 returned.next = null;
1281 returned.taskStatus = TaskStatus.inProgress;
1282 }
1283 if (head !isnull)
1284 {
1285 head.prev = null;
1286 }
1287 1288 returnreturned;
1289 }
1290 1291 // Push a task onto the queue.1292 voidabstractPut(AbstractTask* task)
1293 {
1294 queueLock();
1295 scope(exit) queueUnlock();
1296 abstractPutNoSync(task);
1297 }
1298 1299 voidabstractPutNoSync(AbstractTask* task)
1300 in1301 {
1302 assert(task);
1303 }
1304 out1305 {
1306 importstd.conv : text;
1307 1308 assert(tail.prev !istail);
1309 assert(tail.nextisnull, text(tail.prev, '\t', tail.next));
1310 if (tail.prev !isnull)
1311 {
1312 assert(tail.prev.nextistail, text(tail.prev, '\t', tail.next));
1313 }
1314 }
1315 do1316 {
1317 // Not using enforce() to save on function call overhead since this1318 // is a performance critical function.1319 if (status != PoolState.running)
1320 {
1321 thrownewError(
1322 "Cannot submit a new task to a pool after calling " ~
1323 "finish() or stop()."1324 );
1325 }
1326 1327 task.next = null;
1328 if (headisnull) //Queue is empty.1329 {
1330 head = task;
1331 tail = task;
1332 tail.prev = null;
1333 }
1334 else1335 {
1336 assert(tail);
1337 task.prev = tail;
1338 tail.next = task;
1339 tail = task;
1340 }
1341 notify();
1342 }
1343 1344 voidabstractPutGroupNoSync(AbstractTask* h, AbstractTask* t)
1345 {
1346 if (status != PoolState.running)
1347 {
1348 thrownewError(
1349 "Cannot submit a new task to a pool after calling " ~
1350 "finish() or stop()."1351 );
1352 }
1353 1354 if (headisnull)
1355 {
1356 head = h;
1357 tail = t;
1358 }
1359 else1360 {
1361 h.prev = tail;
1362 tail.next = h;
1363 tail = t;
1364 }
1365 1366 notifyAll();
1367 }
1368 1369 voidtryDeleteExecute(AbstractTask* toExecute)
1370 {
1371 if (isSingleTask) return;
1372 1373 if ( !deleteItem(toExecute) )
1374 {
1375 return;
1376 }
1377 1378 try1379 {
1380 toExecute.job();
1381 }
1382 catch (Exceptione)
1383 {
1384 toExecute.exception = e;
1385 }
1386 1387 atomicSetUbyte(toExecute.taskStatus, TaskStatus.done);
1388 }
1389 1390 booldeleteItem(AbstractTask* item)
1391 {
1392 queueLock();
1393 scope(exit) queueUnlock();
1394 returndeleteItemNoSync(item);
1395 }
1396 1397 booldeleteItemNoSync(AbstractTask* item)
1398 {
1399 if (item.taskStatus != TaskStatus.notStarted)
1400 {
1401 returnfalse;
1402 }
1403 item.taskStatus = TaskStatus.inProgress;
1404 1405 if (itemishead)
1406 {
1407 // Make sure head gets set properly.1408 popNoSync();
1409 returntrue;
1410 }
1411 if (itemistail)
1412 {
1413 tail = tail.prev;
1414 if (tail !isnull)
1415 {
1416 tail.next = null;
1417 }
1418 item.next = null;
1419 item.prev = null;
1420 returntrue;
1421 }
1422 if (item.next !isnull)
1423 {
1424 assert(item.next.previsitem); // Check queue consistency.1425 item.next.prev = item.prev;
1426 }
1427 if (item.prev !isnull)
1428 {
1429 assert(item.prev.nextisitem); // Check queue consistency.1430 item.prev.next = item.next;
1431 }
1432 item.next = null;
1433 item.prev = null;
1434 returntrue;
1435 }
1436 1437 voidqueueLock()
1438 {
1439 assert(queueMutex);
1440 if (!isSingleTask) queueMutex.lock();
1441 }
1442 1443 voidqueueUnlock()
1444 {
1445 assert(queueMutex);
1446 if (!isSingleTask) queueMutex.unlock();
1447 }
1448 1449 voidwaiterLock()
1450 {
1451 if (!isSingleTask) waiterMutex.lock();
1452 }
1453 1454 voidwaiterUnlock()
1455 {
1456 if (!isSingleTask) waiterMutex.unlock();
1457 }
1458 1459 voidwait()
1460 {
1461 if (!isSingleTask) workerCondition.wait();
1462 }
1463 1464 voidnotify()
1465 {
1466 if (!isSingleTask) workerCondition.notify();
1467 }
1468 1469 voidnotifyAll()
1470 {
1471 if (!isSingleTask) workerCondition.notifyAll();
1472 }
1473 1474 voidwaitUntilCompletion()
1475 {
1476 if (isSingleTask)
1477 {
1478 singleTaskThread.join();
1479 }
1480 else1481 {
1482 waiterCondition.wait();
1483 }
1484 }
1485 1486 voidnotifyWaiters()
1487 {
1488 if (!isSingleTask) waiterCondition.notifyAll();
1489 }
1490 1491 // Private constructor for creating dummy pools that only have one thread,1492 // only execute one Task, and then terminate. This is used for1493 // Task.executeInNewThread().1494 this(AbstractTask* task, intpriority = int.max)
1495 {
1496 assert(task);
1497 1498 // Dummy value, not used.1499 instanceStartIndex = 0;
1500 1501 this.isSingleTask = true;
1502 task.taskStatus = TaskStatus.inProgress;
1503 this.head = task;
1504 singleTaskThread = newThread(&doSingleTask);
1505 singleTaskThread.start();
1506 1507 // Disabled until writing code to support1508 // running thread with specified priority1509 // See https://issues.dlang.org/show_bug.cgi?id=89601510 1511 /*if (priority != int.max)
1512 {
1513 singleTaskThread.priority = priority;
1514 }*/1515 }
1516 1517 public:
1518 // This is used in parallel_algorithm but is too unstable to document1519 // as public API.1520 size_tdefaultWorkUnitSize(size_trangeLen) const @safepurenothrow1521 {
1522 importstd.algorithm.comparison : max;
1523 1524 if (this.size == 0)
1525 {
1526 returnmax(rangeLen, 1);
1527 }
1528 1529 immutablesize_teightSize = 4 * (this.size + 1);
1530 autoret = (rangeLen / eightSize) + ((rangeLen % eightSize == 0) ? 0 : 1);
1531 returnmax(ret, 1);
1532 }
1533 1534 /**
1535 Default constructor that initializes a `TaskPool` with
1536 `totalCPUs` - 1 worker threads. The minus 1 is included because the
1537 main thread will also be available to do work.
1538 1539 Note: On single-core machines, the primitives provided by `TaskPool`
1540 operate transparently in single-threaded mode.
1541 */1542 this() @trusted1543 {
1544 this(totalCPUs - 1);
1545 }
1546 1547 /**
1548 Allows for custom number of worker threads.
1549 */1550 this(size_tnWorkers) @trusted1551 {
1552 synchronized(typeid(TaskPool))
1553 {
1554 instanceStartIndex = nextInstanceIndex;
1555 1556 // The first worker thread to be initialized will have this index,1557 // and will increment it. The second worker to be initialized will1558 // have this index plus 1.1559 nextThreadIndex = instanceStartIndex;
1560 nextInstanceIndex += nWorkers;
1561 }
1562 1563 queueMutex = newMutex(this);
1564 waiterMutex = newMutex();
1565 workerCondition = newCondition(queueMutex);
1566 waiterCondition = newCondition(waiterMutex);
1567 1568 pool = newParallelismThread[nWorkers];
1569 foreach (refpoolThread; pool)
1570 {
1571 poolThread = newParallelismThread(&startWorkLoop);
1572 poolThread.pool = this;
1573 poolThread.start();
1574 }
1575 }
1576 1577 /**
1578 Implements a parallel foreach loop over a range. This works by implicitly
1579 creating and submitting one `Task` to the `TaskPool` for each worker
1580 thread. A work unit is a set of consecutive elements of `range` to
1581 be processed by a worker thread between communication with any other
1582 thread. The number of elements processed per work unit is controlled by the
1583 `workUnitSize` parameter. Smaller work units provide better load
1584 balancing, but larger work units avoid the overhead of communicating
1585 with other threads frequently to fetch the next work unit. Large work
1586 units also avoid false sharing in cases where the range is being modified.
1587 The less time a single iteration of the loop takes, the larger
1588 `workUnitSize` should be. For very expensive loop bodies,
1589 `workUnitSize` should be 1. An overload that chooses a default work
1590 unit size is also available.
1591 1592 Example:
1593 ---
1594 // Find the logarithm of every number from 1 to
1595 // 10_000_000 in parallel.
1596 auto logs = new double[10_000_000];
1597 1598 // Parallel foreach works with or without an index
1599 // variable. It can iterate by ref if range.front
1600 // returns by ref.
1601 1602 // Iterate over logs using work units of size 100.
1603 foreach (i, ref elem; taskPool.parallel(logs, 100))
1604 {
1605 elem = log(i + 1.0);
1606 }
1607 1608 // Same thing, but use the default work unit size.
1609 //
1610 // Timings on an Athlon 64 X2 dual core machine:
1611 //
1612 // Parallel foreach: 388 milliseconds
1613 // Regular foreach: 619 milliseconds
1614 foreach (i, ref elem; taskPool.parallel(logs))
1615 {
1616 elem = log(i + 1.0);
1617 }
1618 ---
1619 1620 Notes:
1621 1622 The memory usage of this implementation is guaranteed to be constant
1623 in `range.length`.
1624 1625 Breaking from a parallel foreach loop via a break, labeled break,
1626 labeled continue, return or goto statement throws a
1627 `ParallelForeachError`.
1628 1629 In the case of non-random access ranges, parallel foreach buffers lazily
1630 to an array of size `workUnitSize` before executing the parallel portion
1631 of the loop. The exception is that, if a parallel foreach is executed
1632 over a range returned by `asyncBuf` or `map`, the copying is elided
1633 and the buffers are simply swapped. In this case `workUnitSize` is
1634 ignored and the work unit size is set to the buffer size of `range`.
1635 1636 A memory barrier is guaranteed to be executed on exit from the loop,
1637 so that results produced by all threads are visible in the calling thread.
1638 1639 $(B Exception Handling):
1640 1641 When at least one exception is thrown from inside a parallel foreach loop,
1642 the submission of additional `Task` objects is terminated as soon as
1643 possible, in a non-deterministic manner. All executing or
1644 enqueued work units are allowed to complete. Then, all exceptions that
1645 were thrown by any work unit are chained using `Throwable.next` and
1646 rethrown. The order of the exception chaining is non-deterministic.
1647 */1648 ParallelForeach!Rparallel(R)(Rrange, size_tworkUnitSize)
1649 {
1650 importstd.exception : enforce;
1651 enforce(workUnitSize > 0, "workUnitSize must be > 0.");
1652 aliasRetType = ParallelForeach!R;
1653 returnRetType(this, range, workUnitSize);
1654 }
1655 1656 1657 /// Ditto1658 ParallelForeach!Rparallel(R)(Rrange)
1659 {
1660 staticif (hasLength!R)
1661 {
1662 // Default work unit size is such that we would use 4x as many1663 // slots as are in this thread pool.1664 size_tworkUnitSize = defaultWorkUnitSize(range.length);
1665 returnparallel(range, workUnitSize);
1666 }
1667 else1668 {
1669 // Just use a really, really dumb guess if the user is too lazy to1670 // specify.1671 returnparallel(range, 512);
1672 }
1673 }
1674 1675 ///1676 templateamap(functions...)
1677 {
1678 /**
1679 Eager parallel map. The eagerness of this function means it has less
1680 overhead than the lazily evaluated `TaskPool.map` and should be
1681 preferred where the memory requirements of eagerness are acceptable.
1682 `functions` are the functions to be evaluated, passed as template
1683 alias parameters in a style similar to
1684 $(REF map, std,algorithm,iteration).
1685 The first argument must be a random access range. For performance
1686 reasons, amap will assume the range elements have not yet been
1687 initialized. Elements will be overwritten without calling a destructor
1688 nor doing an assignment. As such, the range must not contain meaningful
1689 data$(DDOC_COMMENT not a section): either un-initialized objects, or
1690 objects in their `.init` state.
1691 1692 ---
1693 auto numbers = iota(100_000_000.0);
1694 1695 // Find the square roots of numbers.
1696 //
1697 // Timings on an Athlon 64 X2 dual core machine:
1698 //
1699 // Parallel eager map: 0.802 s
1700 // Equivalent serial implementation: 1.768 s
1701 auto squareRoots = taskPool.amap!sqrt(numbers);
1702 ---
1703 1704 Immediately after the range argument, an optional work unit size argument
1705 may be provided. Work units as used by `amap` are identical to those
1706 defined for parallel foreach. If no work unit size is provided, the
1707 default work unit size is used.
1708 1709 ---
1710 // Same thing, but make work unit size 100.
1711 auto squareRoots = taskPool.amap!sqrt(numbers, 100);
1712 ---
1713 1714 An output range for returning the results may be provided as the last
1715 argument. If one is not provided, an array of the proper type will be
1716 allocated on the garbage collected heap. If one is provided, it must be a
1717 random access range with assignable elements, must have reference
1718 semantics with respect to assignment to its elements, and must have the
1719 same length as the input range. Writing to adjacent elements from
1720 different threads must be safe.
1721 1722 ---
1723 // Same thing, but explicitly allocate an array
1724 // to return the results in. The element type
1725 // of the array may be either the exact type
1726 // returned by functions or an implicit conversion
1727 // target.
1728 auto squareRoots = new float[numbers.length];
1729 taskPool.amap!sqrt(numbers, squareRoots);
1730 1731 // Multiple functions, explicit output range, and
1732 // explicit work unit size.
1733 auto results = new Tuple!(float, real)[numbers.length];
1734 taskPool.amap!(sqrt, log)(numbers, 100, results);
1735 ---
1736 1737 Note:
1738 1739 A memory barrier is guaranteed to be executed after all results are written
1740 but before returning so that results produced by all threads are visible
1741 in the calling thread.
1742 1743 Tips:
1744 1745 To perform the mapping operation in place, provide the same range for the
1746 input and output range.
1747 1748 To parallelize the copying of a range with expensive to evaluate elements
1749 to an array, pass an identity function (a function that just returns
1750 whatever argument is provided to it) to `amap`.
1751 1752 $(B Exception Handling):
1753 1754 When at least one exception is thrown from inside the map functions,
1755 the submission of additional `Task` objects is terminated as soon as
1756 possible, in a non-deterministic manner. All currently executing or
1757 enqueued work units are allowed to complete. Then, all exceptions that
1758 were thrown from any work unit are chained using `Throwable.next` and
1759 rethrown. The order of the exception chaining is non-deterministic.
1760 */1761 autoamap(Args...)(Argsargs)
1762 if (isRandomAccessRange!(Args[0]))
1763 {
1764 importcore.internal.lifetime : emplaceRef;
1765 1766 aliasfun = adjoin!(staticMap!(unaryFun, functions));
1767 1768 aliasrange = args[0];
1769 immutablelen = range.length;
1770 1771 staticif (
1772 Args.length > 1 &&
1773 randAssignable!(Args[$ - 1]) &&
1774 is(MapType!(Args[0], functions) : ElementType!(Args[$ - 1]))
1775 )
1776 {
1777 importstd.conv : text;
1778 importstd.exception : enforce;
1779 1780 aliasbuf = args[$ - 1];
1781 aliasargs2 = args[0..$ - 1];
1782 aliasArgs2 = Args[0..$ - 1];
1783 enforce(buf.length == len,
1784 text("Can't use a user supplied buffer that's the wrong ",
1785 "size. (Expected :", len, " Got: ", buf.length));
1786 }
1787 elsestaticif (randAssignable!(Args[$ - 1]) && Args.length > 1)
1788 {
1789 staticassert(0, "Wrong buffer type.");
1790 }
1791 else1792 {
1793 importstd.array : uninitializedArray;
1794 1795 autobuf = uninitializedArray!(MapType!(Args[0], functions)[])(len);
1796 aliasargs2 = args;
1797 aliasArgs2 = Args;
1798 }
1799 1800 if (!len) returnbuf;
1801 1802 staticif (isIntegral!(Args2[$ - 1]))
1803 {
1804 staticassert(args2.length == 2);
1805 autoworkUnitSize = cast(size_t) args2[1];
1806 }
1807 else1808 {
1809 staticassert(args2.length == 1, Args);
1810 autoworkUnitSize = defaultWorkUnitSize(range.length);
1811 }
1812 1813 aliasR = typeof(range);
1814 1815 if (workUnitSize > len)
1816 {
1817 workUnitSize = len;
1818 }
1819 1820 // Handle as a special case:1821 if (size == 0)
1822 {
1823 size_tindex = 0;
1824 foreach (elem; range)
1825 {
1826 emplaceRef(buf[index++], fun(elem));
1827 }
1828 returnbuf;
1829 }
1830 1831 // Effectively -1: chunkIndex + 1 == 0:1832 sharedsize_tworkUnitIndex = size_t.max;
1833 sharedboolshouldContinue = true;
1834 1835 voiddoIt()
1836 {
1837 importstd.algorithm.comparison : min;
1838 1839 scope(failure)
1840 {
1841 // If an exception is thrown, all threads should bail.1842 atomicStore(shouldContinue, false);
1843 }
1844 1845 while (atomicLoad(shouldContinue))
1846 {
1847 immutablemyUnitIndex = atomicOp!"+="(workUnitIndex, 1);
1848 immutablestart = workUnitSize * myUnitIndex;
1849 if (start >= len)
1850 {
1851 atomicStore(shouldContinue, false);
1852 break;
1853 }
1854 1855 immutableend = min(len, start + workUnitSize);
1856 1857 staticif (hasSlicing!R)
1858 {
1859 autosubrange = range[start .. end];
1860 foreach (i; start .. end)
1861 {
1862 emplaceRef(buf[i], fun(subrange.front));
1863 subrange.popFront();
1864 }
1865 }
1866 else1867 {
1868 foreach (i; start .. end)
1869 {
1870 emplaceRef(buf[i], fun(range[i]));
1871 }
1872 }
1873 }
1874 }
1875 1876 submitAndExecute(this, &doIt);
1877 returnbuf;
1878 }
1879 }
1880 1881 ///1882 templatemap(functions...)
1883 {
1884 /**
1885 A semi-lazy parallel map that can be used for pipelining. The map
1886 functions are evaluated for the first `bufSize` elements and stored in a
1887 buffer and made available to `popFront`. Meanwhile, in the
1888 background a second buffer of the same size is filled. When the first
1889 buffer is exhausted, it is swapped with the second buffer and filled while
1890 the values from what was originally the second buffer are read. This
1891 implementation allows for elements to be written to the buffer without
1892 the need for atomic operations or synchronization for each write, and
1893 enables the mapping function to be evaluated efficiently in parallel.
1894 1895 `map` has more overhead than the simpler procedure used by `amap`
1896 but avoids the need to keep all results in memory simultaneously and works
1897 with non-random access ranges.
1898 1899 Params:
1900 1901 source = The $(REF_ALTTEXT input range, isInputRange, std,range,primitives)
1902 to be mapped. If `source` is not random
1903 access it will be lazily buffered to an array of size `bufSize` before
1904 the map function is evaluated. (For an exception to this rule, see Notes.)
1905 1906 bufSize = The size of the buffer to store the evaluated elements.
1907 1908 workUnitSize = The number of elements to evaluate in a single
1909 `Task`. Must be less than or equal to `bufSize`, and
1910 should be a fraction of `bufSize` such that all worker threads can be
1911 used. If the default of size_t.max is used, workUnitSize will be set to
1912 the pool-wide default.
1913 1914 Returns: An input range representing the results of the map. This range
1915 has a length iff `source` has a length.
1916 1917 Notes:
1918 1919 If a range returned by `map` or `asyncBuf` is used as an input to
1920 `map`, then as an optimization the copying from the output buffer
1921 of the first range to the input buffer of the second range is elided, even
1922 though the ranges returned by `map` and `asyncBuf` are non-random
1923 access ranges. This means that the `bufSize` parameter passed to the
1924 current call to `map` will be ignored and the size of the buffer
1925 will be the buffer size of `source`.
1926 1927 Example:
1928 ---
1929 // Pipeline reading a file, converting each line
1930 // to a number, taking the logarithms of the numbers,
1931 // and performing the additions necessary to find
1932 // the sum of the logarithms.
1933 1934 auto lineRange = File("numberList.txt").byLine();
1935 auto dupedLines = std.algorithm.map!"a.idup"(lineRange);
1936 auto nums = taskPool.map!(to!double)(dupedLines);
1937 auto logs = taskPool.map!log10(nums);
1938 1939 double sum = 0;
1940 foreach (elem; logs)
1941 {
1942 sum += elem;
1943 }
1944 ---
1945 1946 $(B Exception Handling):
1947 1948 Any exceptions thrown while iterating over `source`
1949 or computing the map function are re-thrown on a call to `popFront` or,
1950 if thrown during construction, are simply allowed to propagate to the
1951 caller. In the case of exceptions thrown while computing the map function,
1952 the exceptions are chained as in `TaskPool.amap`.
1953 */1954 auto1955 map(S)(Ssource, size_tbufSize = 100, size_tworkUnitSize = size_t.max)
1956 if (isInputRange!S)
1957 {
1958 importstd.exception : enforce;
1959 1960 enforce(workUnitSize == size_t.max || workUnitSize <= bufSize,
1961 "Work unit size must be smaller than buffer size.");
1962 aliasfun = adjoin!(staticMap!(unaryFun, functions));
1963 1964 staticfinalclassMap1965 {
1966 // This is a class because the task needs to be located on the1967 // heap and in the non-random access case source needs to be on1968 // the heap, too.1969 1970 private:
1971 enumbufferTrick = is(typeof(source.buf1)) &&
1972 is(typeof(source.bufPos)) &&
1973 is(typeof(source.doBufSwap()));
1974 1975 aliasE = MapType!(S, functions);
1976 E[] buf1, buf2;
1977 Ssource;
1978 TaskPoolpool;
1979 Task!(run, E[] delegate(E[]), E[]) nextBufTask;
1980 size_tworkUnitSize;
1981 size_tbufPos;
1982 boollastTaskWaited;
1983 1984 staticif (isRandomAccessRange!S)
1985 {
1986 aliasFromType = S;
1987 1988 voidpopSource()
1989 {
1990 importstd.algorithm.comparison : min;
1991 1992 staticif (__traits(compiles, source[0 .. source.length]))
1993 {
1994 source = source[min(buf1.length, source.length)..source.length];
1995 }
1996 elsestaticif (__traits(compiles, source[0..$]))
1997 {
1998 source = source[min(buf1.length, source.length)..$];
1999 }
2000 else2001 {
2002 staticassert(0, "S must have slicing for Map."2003 ~ " " ~ S.stringof ~ " doesn't.");
2004 }
2005 }
2006 }
2007 elsestaticif (bufferTrick)
2008 {
2009 // Make sure we don't have the buffer recycling overload of2010 // asyncBuf.2011 staticif (
2012 is(typeof(source.source)) &&
2013 isRoundRobin!(typeof(source.source))
2014 )
2015 {
2016 staticassert(0, "Cannot execute a parallel map on " ~
2017 "the buffer recycling overload of asyncBuf."2018 );
2019 }
2020 2021 aliasFromType = typeof(source.buf1);
2022 FromTypefrom;
2023 2024 // Just swap our input buffer with source's output buffer.2025 // No need to copy element by element.2026 FromTypedumpToFrom()
2027 {
2028 importstd.algorithm.mutation : swap;
2029 2030 assert(source.buf1.length <= from.length);
2031 from.length = source.buf1.length;
2032 swap(source.buf1, from);
2033 2034 // Just in case this source has been popped before2035 // being sent to map:2036 from = from[source.bufPos..$];
2037 2038 staticif (is(typeof(source._length)))
2039 {
2040 source._length -= (from.length - source.bufPos);
2041 }
2042 2043 source.doBufSwap();
2044 2045 returnfrom;
2046 }
2047 }
2048 else2049 {
2050 aliasFromType = ElementType!S[];
2051 2052 // The temporary array that data is copied to before being2053 // mapped.2054 FromTypefrom;
2055 2056 FromTypedumpToFrom()
2057 {
2058 assert(from !isnull);
2059 2060 size_ti;
2061 for (; !source.empty && i < from.length; source.popFront())
2062 {
2063 from[i++] = source.front;
2064 }
2065 2066 from = from[0 .. i];
2067 returnfrom;
2068 }
2069 }
2070 2071 staticif (hasLength!S)
2072 {
2073 size_t_length;
2074 2075 public @propertysize_tlength() const @safepurenothrow2076 {
2077 return_length;
2078 }
2079 }
2080 2081 this(Ssource, size_tbufSize, size_tworkUnitSize, TaskPoolpool)
2082 {
2083 staticif (bufferTrick)
2084 {
2085 bufSize = source.buf1.length;
2086 }
2087 2088 buf1.length = bufSize;
2089 buf2.length = bufSize;
2090 2091 staticif (!isRandomAccessRange!S)
2092 {
2093 from.length = bufSize;
2094 }
2095 2096 this.workUnitSize = (workUnitSize == size_t.max) ?
2097 pool.defaultWorkUnitSize(bufSize) : workUnitSize;
2098 this.source = source;
2099 this.pool = pool;
2100 2101 staticif (hasLength!S)
2102 {
2103 _length = source.length;
2104 }
2105 2106 buf1 = fillBuf(buf1);
2107 submitBuf2();
2108 }
2109 2110 // The from parameter is a dummy and ignored in the random access2111 // case.2112 E[] fillBuf(E[] buf)
2113 {
2114 importstd.algorithm.comparison : min;
2115 2116 staticif (isRandomAccessRange!S)
2117 {
2118 importstd.range : take;
2119 autotoMap = take(source, buf.length);
2120 scope(success) popSource();
2121 }
2122 else2123 {
2124 autotoMap = dumpToFrom();
2125 }
2126 2127 buf = buf[0 .. min(buf.length, toMap.length)];
2128 2129 // Handle as a special case:2130 if (pool.size == 0)
2131 {
2132 size_tindex = 0;
2133 foreach (elem; toMap)
2134 {
2135 buf[index++] = fun(elem);
2136 }
2137 returnbuf;
2138 }
2139 2140 pool.amap!functions(toMap, workUnitSize, buf);
2141 2142 returnbuf;
2143 }
2144 2145 voidsubmitBuf2()
2146 in2147 {
2148 assert(nextBufTask.previsnull);
2149 assert(nextBufTask.nextisnull);
2150 }
2151 do2152 {
2153 // Hack to reuse the task object.2154 2155 nextBufTask = typeof(nextBufTask).init;
2156 nextBufTask._args[0] = &fillBuf;
2157 nextBufTask._args[1] = buf2;
2158 pool.put(nextBufTask);
2159 }
2160 2161 voiddoBufSwap()
2162 {
2163 if (lastTaskWaited)
2164 {
2165 // Then the source is empty. Signal it here.2166 buf1 = null;
2167 buf2 = null;
2168 2169 staticif (!isRandomAccessRange!S)
2170 {
2171 from = null;
2172 }
2173 2174 return;
2175 }
2176 2177 buf2 = buf1;
2178 buf1 = nextBufTask.yieldForce;
2179 bufPos = 0;
2180 2181 if (source.empty)
2182 {
2183 lastTaskWaited = true;
2184 }
2185 else2186 {
2187 submitBuf2();
2188 }
2189 }
2190 2191 public:
2192 @propertyautofront()
2193 {
2194 returnbuf1[bufPos];
2195 }
2196 2197 voidpopFront()
2198 {
2199 staticif (hasLength!S)
2200 {
2201 _length--;
2202 }
2203 2204 bufPos++;
2205 if (bufPos >= buf1.length)
2206 {
2207 doBufSwap();
2208 }
2209 }
2210 2211 staticif (isInfinite!S)
2212 {
2213 enumboolempty = false;
2214 }
2215 else2216 {
2217 2218 boolempty() const @property2219 {
2220 // popFront() sets this when source is empty2221 returnbuf1.length == 0;
2222 }
2223 }
2224 }
2225 returnnewMap(source, bufSize, workUnitSize, this);
2226 }
2227 }
2228 2229 /**
2230 Given a `source` range that is expensive to iterate over, returns an
2231 $(REF_ALTTEXT input range, isInputRange, std,range,primitives) that
2232 asynchronously buffers the contents of `source` into a buffer of `bufSize` elements in a worker thread,
2233 while making previously buffered elements from a second buffer, also of size
2234 `bufSize`, available via the range interface of the returned
2235 object. The returned range has a length iff `hasLength!S`.
2236 `asyncBuf` is useful, for example, when performing expensive operations
2237 on the elements of ranges that represent data on a disk or network.
2238 2239 Example:
2240 ---
2241 import std.conv, std.stdio;
2242 2243 void main()
2244 {
2245 // Fetch lines of a file in a background thread
2246 // while processing previously fetched lines,
2247 // dealing with byLine's buffer recycling by
2248 // eagerly duplicating every line.
2249 auto lines = File("foo.txt").byLine();
2250 auto duped = std.algorithm.map!"a.idup"(lines);
2251 2252 // Fetch more lines in the background while we
2253 // process the lines already read into memory
2254 // into a matrix of doubles.
2255 double[][] matrix;
2256 auto asyncReader = taskPool.asyncBuf(duped);
2257 2258 foreach (line; asyncReader)
2259 {
2260 auto ls = line.split("\t");
2261 matrix ~= to!(double[])(ls);
2262 }
2263 }
2264 ---
2265 2266 $(B Exception Handling):
2267 2268 Any exceptions thrown while iterating over `source` are re-thrown on a
2269 call to `popFront` or, if thrown during construction, simply
2270 allowed to propagate to the caller.
2271 */2272 autoasyncBuf(S)(Ssource, size_tbufSize = 100)
2273 if (isInputRange!S)
2274 {
2275 staticfinalclassAsyncBuf2276 {
2277 // This is a class because the task and source both need to be on2278 // the heap.2279 2280 // The element type of S.2281 aliasE = ElementType!S; // Needs to be here b/c of forward ref bugs.2282 2283 private:
2284 E[] buf1, buf2;
2285 Ssource;
2286 TaskPoolpool;
2287 Task!(run, E[] delegate(E[]), E[]) nextBufTask;
2288 size_tbufPos;
2289 boollastTaskWaited;
2290 2291 staticif (hasLength!S)
2292 {
2293 size_t_length;
2294 2295 // Available if hasLength!S.2296 public @propertysize_tlength() const @safepurenothrow2297 {
2298 return_length;
2299 }
2300 }
2301 2302 this(Ssource, size_tbufSize, TaskPoolpool)
2303 {
2304 buf1.length = bufSize;
2305 buf2.length = bufSize;
2306 2307 this.source = source;
2308 this.pool = pool;
2309 2310 staticif (hasLength!S)
2311 {
2312 _length = source.length;
2313 }
2314 2315 buf1 = fillBuf(buf1);
2316 submitBuf2();
2317 }
2318 2319 E[] fillBuf(E[] buf)
2320 {
2321 assert(buf !isnull);
2322 2323 size_ti;
2324 for (; !source.empty && i < buf.length; source.popFront())
2325 {
2326 buf[i++] = source.front;
2327 }
2328 2329 buf = buf[0 .. i];
2330 returnbuf;
2331 }
2332 2333 voidsubmitBuf2()
2334 in2335 {
2336 assert(nextBufTask.previsnull);
2337 assert(nextBufTask.nextisnull);
2338 }
2339 do2340 {
2341 // Hack to reuse the task object.2342 2343 nextBufTask = typeof(nextBufTask).init;
2344 nextBufTask._args[0] = &fillBuf;
2345 nextBufTask._args[1] = buf2;
2346 pool.put(nextBufTask);
2347 }
2348 2349 voiddoBufSwap()
2350 {
2351 if (lastTaskWaited)
2352 {
2353 // Then source is empty. Signal it here.2354 buf1 = null;
2355 buf2 = null;
2356 return;
2357 }
2358 2359 buf2 = buf1;
2360 buf1 = nextBufTask.yieldForce;
2361 bufPos = 0;
2362 2363 if (source.empty)
2364 {
2365 lastTaskWaited = true;
2366 }
2367 else2368 {
2369 submitBuf2();
2370 }
2371 }
2372 2373 public:
2374 Efront() @property2375 {
2376 returnbuf1[bufPos];
2377 }
2378 2379 voidpopFront()
2380 {
2381 staticif (hasLength!S)
2382 {
2383 _length--;
2384 }
2385 2386 bufPos++;
2387 if (bufPos >= buf1.length)
2388 {
2389 doBufSwap();
2390 }
2391 }
2392 2393 staticif (isInfinite!S)
2394 {
2395 enumboolempty = false;
2396 }
2397 2398 else2399 {
2400 ///2401 boolempty() @property2402 {
2403 // popFront() sets this when source is empty:2404 returnbuf1.length == 0;
2405 }
2406 }
2407 }
2408 returnnewAsyncBuf(source, bufSize, this);
2409 }
2410 2411 /**
2412 Given a callable object `next` that writes to a user-provided buffer and
2413 a second callable object `empty` that determines whether more data is
2414 available to write via `next`, returns an input range that
2415 asynchronously calls `next` with a set of size `nBuffers` of buffers
2416 and makes the results available in the order they were obtained via the
2417 input range interface of the returned object. Similarly to the
2418 input range overload of `asyncBuf`, the first half of the buffers
2419 are made available via the range interface while the second half are
2420 filled and vice-versa.
2421 2422 Params:
2423 2424 next = A callable object that takes a single argument that must be an array
2425 with mutable elements. When called, `next` writes data to
2426 the array provided by the caller.
2427 2428 empty = A callable object that takes no arguments and returns a type
2429 implicitly convertible to `bool`. This is used to signify
2430 that no more data is available to be obtained by calling `next`.
2431 2432 initialBufSize = The initial size of each buffer. If `next` takes its
2433 array by reference, it may resize the buffers.
2434 2435 nBuffers = The number of buffers to cycle through when calling `next`.
2436 2437 Example:
2438 ---
2439 // Fetch lines of a file in a background
2440 // thread while processing previously fetched
2441 // lines, without duplicating any lines.
2442 auto file = File("foo.txt");
2443 2444 void next(ref char[] buf)
2445 {
2446 file.readln(buf);
2447 }
2448 2449 // Fetch more lines in the background while we
2450 // process the lines already read into memory
2451 // into a matrix of doubles.
2452 double[][] matrix;
2453 auto asyncReader = taskPool.asyncBuf(&next, &file.eof);
2454 2455 foreach (line; asyncReader)
2456 {
2457 auto ls = line.split("\t");
2458 matrix ~= to!(double[])(ls);
2459 }
2460 ---
2461 2462 $(B Exception Handling):
2463 2464 Any exceptions thrown while iterating over `range` are re-thrown on a
2465 call to `popFront`.
2466 2467 Warning:
2468 2469 Using the range returned by this function in a parallel foreach loop
2470 will not work because buffers may be overwritten while the task that
2471 processes them is in queue. This is checked for at compile time
2472 and will result in a static assertion failure.
2473 */2474 autoasyncBuf(C1, C2)(C1next, C2empty, size_tinitialBufSize = 0, size_tnBuffers = 100)
2475 if (is(typeof(C2.init()) : bool) &&
2476 Parameters!C1.length == 1 &&
2477 Parameters!C2.length == 0 &&
2478 isArray!(Parameters!C1[0])
2479 ) {
2480 autoroundRobin = RoundRobinBuffer!(C1, C2)(next, empty, initialBufSize, nBuffers);
2481 returnasyncBuf(roundRobin, nBuffers / 2);
2482 }
2483 2484 ///2485 templatereduce(functions...)
2486 {
2487 /**
2488 Parallel reduce on a random access range. Except as otherwise noted,
2489 usage is similar to $(REF _reduce, std,algorithm,iteration). There is
2490 also $(LREF fold) which does the same thing with a different parameter
2491 order.
2492 2493 This function works by splitting the range to be reduced into work
2494 units, which are slices to be reduced in parallel. Once the results
2495 from all work units are computed, a final serial reduction is performed
2496 on these results to compute the final answer. Therefore, care must be
2497 taken to choose the seed value appropriately.
2498 2499 Because the reduction is being performed in parallel, `functions`
2500 must be associative. For notational simplicity, let # be an
2501 infix operator representing `functions`. Then, (a # b) # c must equal
2502 a # (b # c). Floating point addition is not associative
2503 even though addition in exact arithmetic is. Summing floating
2504 point numbers using this function may give different results than summing
2505 serially. However, for many practical purposes floating point addition
2506 can be treated as associative.
2507 2508 Note that, since `functions` are assumed to be associative,
2509 additional optimizations are made to the serial portion of the reduction
2510 algorithm. These take advantage of the instruction level parallelism of
2511 modern CPUs, in addition to the thread-level parallelism that the rest
2512 of this module exploits. This can lead to better than linear speedups
2513 relative to $(REF _reduce, std,algorithm,iteration), especially for
2514 fine-grained benchmarks like dot products.
2515 2516 An explicit seed may be provided as the first argument. If
2517 provided, it is used as the seed for all work units and for the final
2518 reduction of results from all work units. Therefore, if it is not the
2519 identity value for the operation being performed, results may differ
2520 from those generated by $(REF _reduce, std,algorithm,iteration) or
2521 depending on how many work units are used. The next argument must be
2522 the range to be reduced.
2523 ---
2524 // Find the sum of squares of a range in parallel, using
2525 // an explicit seed.
2526 //
2527 // Timings on an Athlon 64 X2 dual core machine:
2528 //
2529 // Parallel reduce: 72 milliseconds
2530 // Using std.algorithm.reduce instead: 181 milliseconds
2531 auto nums = iota(10_000_000.0f);
2532 auto sumSquares = taskPool.reduce!"a + b"(
2533 0.0, std.algorithm.map!"a * a"(nums)
2534 );
2535 ---
2536 2537 If no explicit seed is provided, the first element of each work unit
2538 is used as a seed. For the final reduction, the result from the first
2539 work unit is used as the seed.
2540 ---
2541 // Find the sum of a range in parallel, using the first
2542 // element of each work unit as the seed.
2543 auto sum = taskPool.reduce!"a + b"(nums);
2544 ---
2545 2546 An explicit work unit size may be specified as the last argument.
2547 Specifying too small a work unit size will effectively serialize the
2548 reduction, as the final reduction of the result of each work unit will
2549 dominate computation time. If `TaskPool.size` for this instance
2550 is zero, this parameter is ignored and one work unit is used.
2551 ---
2552 // Use a work unit size of 100.
2553 auto sum2 = taskPool.reduce!"a + b"(nums, 100);
2554 2555 // Work unit size of 100 and explicit seed.
2556 auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100);
2557 ---
2558 2559 Parallel reduce supports multiple functions, like
2560 `std.algorithm.reduce`.
2561 ---
2562 // Find both the min and max of nums.
2563 auto minMax = taskPool.reduce!(min, max)(nums);
2564 assert(minMax[0] == reduce!min(nums));
2565 assert(minMax[1] == reduce!max(nums));
2566 ---
2567 2568 $(B Exception Handling):
2569 2570 After this function is finished executing, any exceptions thrown
2571 are chained together via `Throwable.next` and rethrown. The chaining
2572 order is non-deterministic.
2573 2574 See_Also:
2575 2576 $(LREF fold) is functionally equivalent to $(LREF _reduce) except the
2577 range parameter comes first and there is no need to use
2578 $(REF_ALTTEXT `tuple`,tuple,std,typecons) for multiple seeds.
2579 */2580 autoreduce(Args...)(Argsargs)
2581 {
2582 importcore.exception : OutOfMemoryError;
2583 importcore.internal.lifetime : emplaceRef;
2584 importstd.exception : enforce;
2585 2586 aliasfun = reduceAdjoin!functions;
2587 aliasfinishFun = reduceFinish!functions;
2588 2589 staticif (isIntegral!(Args[$ - 1]))
2590 {
2591 size_tworkUnitSize = cast(size_t) args[$ - 1];
2592 aliasargs2 = args[0..$ - 1];
2593 aliasArgs2 = Args[0..$ - 1];
2594 }
2595 else2596 {
2597 aliasargs2 = args;
2598 aliasArgs2 = Args;
2599 }
2600 2601 automakeStartValue(Type)(Typee)
2602 {
2603 staticif (functions.length == 1)
2604 {
2605 returne;
2606 }
2607 else2608 {
2609 typeof(adjoin!(staticMap!(binaryFun, functions))(e, e)) seed = void;
2610 foreach (i, T; seed.Types)
2611 {
2612 emplaceRef(seed.expand[i], e);
2613 }
2614 2615 returnseed;
2616 }
2617 }
2618 2619 staticif (args2.length == 2)
2620 {
2621 staticassert(isInputRange!(Args2[1]));
2622 aliasrange = args2[1];
2623 aliasseed = args2[0];
2624 enumexplicitSeed = true;
2625 2626 staticif (!is(typeof(workUnitSize)))
2627 {
2628 size_tworkUnitSize = defaultWorkUnitSize(range.length);
2629 }
2630 }
2631 else2632 {
2633 staticassert(args2.length == 1);
2634 aliasrange = args2[0];
2635 2636 staticif (!is(typeof(workUnitSize)))
2637 {
2638 size_tworkUnitSize = defaultWorkUnitSize(range.length);
2639 }
2640 2641 enforce(!range.empty,
2642 "Cannot reduce an empty range with first element as start value.");
2643 2644 autoseed = makeStartValue(range.front);
2645 enumexplicitSeed = false;
2646 range.popFront();
2647 }
2648 2649 aliasE = typeof(seed);
2650 aliasR = typeof(range);
2651 2652 EreduceOnRange(Rrange, size_tlowerBound, size_tupperBound)
2653 {
2654 // This is for exploiting instruction level parallelism by2655 // using multiple accumulator variables within each thread,2656 // since we're assuming functions are associative anyhow.2657 2658 // This is so that loops can be unrolled automatically.2659 enumilpTuple = AliasSeq!(0, 1, 2, 3, 4, 5);
2660 enumnILP = ilpTuple.length;
2661 immutablesubSize = (upperBound - lowerBound) / nILP;
2662 2663 if (subSize <= 1)
2664 {
2665 // Handle as a special case.2666 staticif (explicitSeed)
2667 {
2668 Eresult = seed;
2669 }
2670 else2671 {
2672 Eresult = makeStartValue(range[lowerBound]);
2673 lowerBound++;
2674 }
2675 2676 foreach (i; lowerBound .. upperBound)
2677 {
2678 result = fun(result, range[i]);
2679 }
2680 2681 returnresult;
2682 }
2683 2684 assert(subSize > 1);
2685 E[nILP] results;
2686 size_t[nILP] offsets;
2687 2688 foreach (i; ilpTuple)
2689 {
2690 offsets[i] = lowerBound + subSize * i;
2691 2692 staticif (explicitSeed)
2693 {
2694 results[i] = seed;
2695 }
2696 else2697 {
2698 results[i] = makeStartValue(range[offsets[i]]);
2699 offsets[i]++;
2700 }
2701 }
2702 2703 immutablenLoop = subSize - (!explicitSeed);
2704 foreach (i; 0 .. nLoop)
2705 {
2706 foreach (j; ilpTuple)
2707 {
2708 results[j] = fun(results[j], range[offsets[j]]);
2709 offsets[j]++;
2710 }
2711 }
2712 2713 // Finish the remainder.2714 foreach (i; nILP * subSize + lowerBound .. upperBound)
2715 {
2716 results[$ - 1] = fun(results[$ - 1], range[i]);
2717 }
2718 2719 foreach (i; ilpTuple[1..$])
2720 {
2721 results[0] = finishFun(results[0], results[i]);
2722 }
2723 2724 returnresults[0];
2725 }
2726 2727 immutablelen = range.length;
2728 if (len == 0)
2729 {
2730 returnseed;
2731 }
2732 2733 if (this.size == 0)
2734 {
2735 returnfinishFun(seed, reduceOnRange(range, 0, len));
2736 }
2737 2738 // Unlike the rest of the functions here, I can't use the Task object2739 // recycling trick here because this has to work on non-commutative2740 // operations. After all the tasks are done executing, fun() has to2741 // be applied on the results of these to get a final result, but2742 // it can't be evaluated out of order.2743 2744 if (workUnitSize > len)
2745 {
2746 workUnitSize = len;
2747 }
2748 2749 immutablesize_tnWorkUnits = (len / workUnitSize) + ((len % workUnitSize == 0) ? 0 : 1);
2750 assert(nWorkUnits * workUnitSize >= len);
2751 2752 aliasRTask = Task!(run, typeof(&reduceOnRange), R, size_t, size_t);
2753 RTask[] tasks;
2754 2755 // Can't use alloca() due to https://issues.dlang.org/show_bug.cgi?id=37532756 // Use a fixed buffer backed by malloc().2757 enummaxStack = 2_048;
2758 byte[maxStack] buf = void;
2759 immutablesize_tnBytesNeeded = nWorkUnits * RTask.sizeof;
2760 2761 importcore.stdc.stdlib : malloc, free;
2762 if (nBytesNeeded <= maxStack)
2763 {
2764 tasks = (cast(RTask*) buf.ptr)[0 .. nWorkUnits];
2765 }
2766 else2767 {
2768 autoptr = cast(RTask*) malloc(nBytesNeeded);
2769 if (!ptr)
2770 {
2771 thrownewOutOfMemoryError(
2772 "Out of memory in std.parallelism."2773 );
2774 }
2775 2776 tasks = ptr[0 .. nWorkUnits];
2777 }
2778 2779 scope(exit)
2780 {
2781 if (nBytesNeeded > maxStack)
2782 {
2783 free(tasks.ptr);
2784 }
2785 }
2786 2787 // Hack to take the address of a nested function w/o2788 // making a closure.2789 staticautoscopedAddress(D)(scopeDdel) @system2790 {
2791 autotmp = del;
2792 returntmp;
2793 }
2794 2795 size_tcurPos = 0;
2796 voiduseTask(refRTasktask)
2797 {
2798 importstd.algorithm.comparison : min;
2799 importcore.lifetime : emplace;
2800 2801 // Private constructor, so can't feed it's arguments directly2802 // to emplace2803 emplace(&task, RTask2804 (
2805 scopedAddress(&reduceOnRange),
2806 range,
2807 curPos, // lower bound.2808 cast() min(len, curPos + workUnitSize) // upper bound.2809 ));
2810 2811 task.pool = this;
2812 2813 curPos += workUnitSize;
2814 }
2815 2816 foreach (reftask; tasks)
2817 {
2818 useTask(task);
2819 }
2820 2821 foreach (i; 1 .. tasks.length - 1)
2822 {
2823 tasks[i].next = tasks[i + 1].basePtr;
2824 tasks[i + 1].prev = tasks[i].basePtr;
2825 }
2826 2827 if (tasks.length > 1)
2828 {
2829 queueLock();
2830 scope(exit) queueUnlock();
2831 2832 abstractPutGroupNoSync(
2833 tasks[1].basePtr,
2834 tasks[$ - 1].basePtr2835 );
2836 }
2837 2838 if (tasks.length > 0)
2839 {
2840 try2841 {
2842 tasks[0].job();
2843 }
2844 catch (Throwablee)
2845 {
2846 tasks[0].exception = e;
2847 }
2848 tasks[0].taskStatus = TaskStatus.done;
2849 2850 // Try to execute each of these in the current thread2851 foreach (reftask; tasks[1..$])
2852 {
2853 tryDeleteExecute(task.basePtr);
2854 }
2855 }
2856 2857 // Now that we've tried to execute every task, they're all either2858 // done or in progress. Force all of them.2859 Eresult = seed;
2860 2861 ThrowablefirstException;
2862 2863 foreach (reftask; tasks)
2864 {
2865 try2866 {
2867 task.yieldForce;
2868 }
2869 catch (Throwablee)
2870 {
2871 /* Chain e to front because order doesn't matter and because
2872 * e is not likely to be a chain itself (so fewer traversals)
2873 */2874 firstException = Throwable.chainTogether(e, firstException);
2875 continue;
2876 }
2877 2878 if (!firstException) result = finishFun(result, task.returnVal);
2879 }
2880 2881 if (firstException) throwfirstException;
2882 2883 returnresult;
2884 }
2885 }
2886 2887 ///2888 templatefold(functions...)
2889 {
2890 /** Implements the homonym function (also known as `accumulate`, `compress`,
2891 `inject`, or `foldl`) present in various programming languages of
2892 functional flavor.
2893 2894 `fold` is functionally equivalent to $(LREF reduce) except the range
2895 parameter comes first and there is no need to use $(REF_ALTTEXT
2896 `tuple`,tuple,std,typecons) for multiple seeds.
2897 2898 There may be one or more callable entities (`functions` argument) to
2899 apply.
2900 2901 Params:
2902 args = Just the range to _fold over; or the range and one seed
2903 per function; or the range, one seed per function, and
2904 the work unit size
2905 2906 Returns:
2907 The accumulated result as a single value for single function and
2908 as a tuple of values for multiple functions
2909 2910 See_Also:
2911 Similar to $(REF _fold, std,algorithm,iteration), `fold` is a wrapper around $(LREF reduce).
2912 2913 Example:
2914 ---
2915 static int adder(int a, int b)
2916 {
2917 return a + b;
2918 }
2919 static int multiplier(int a, int b)
2920 {
2921 return a * b;
2922 }
2923 2924 // Just the range
2925 auto x = taskPool.fold!adder([1, 2, 3, 4]);
2926 assert(x == 10);
2927 2928 // The range and the seeds (0 and 1 below; also note multiple
2929 // functions in this example)
2930 auto y = taskPool.fold!(adder, multiplier)([1, 2, 3, 4], 0, 1);
2931 assert(y[0] == 10);
2932 assert(y[1] == 24);
2933 2934 // The range, the seed (0), and the work unit size (20)
2935 auto z = taskPool.fold!adder([1, 2, 3, 4], 0, 20);
2936 assert(z == 10);
2937 ---
2938 */2939 autofold(Args...)(Argsargs)
2940 {
2941 staticassert(isInputRange!(Args[0]), "First argument must be an InputRange");
2942 2943 aliasrange = args[0];
2944 2945 staticif (Args.length == 1)
2946 {
2947 // Just the range2948 returnreduce!functions(range);
2949 }
2950 elsestaticif (Args.length == 1 + functions.length ||
2951 Args.length == 1 + functions.length + 1)
2952 {
2953 staticif (functions.length == 1)
2954 {
2955 aliasseeds = args[1];
2956 }
2957 else2958 {
2959 autoseeds()
2960 {
2961 importstd.typecons : tuple;
2962 returntuple(args[1 .. functions.length+1]);
2963 }
2964 }
2965 2966 staticif (Args.length == 1 + functions.length)
2967 {
2968 // The range and the seeds2969 returnreduce!functions(seeds, range);
2970 }
2971 elsestaticif (Args.length == 1 + functions.length + 1)
2972 {
2973 // The range, the seeds, and the work unit size2974 staticassert(isIntegral!(Args[$-1]), "Work unit size must be an integral type");
2975 returnreduce!functions(seeds, range, args[$-1]);
2976 }
2977 }
2978 else2979 {
2980 importstd.conv : text;
2981 staticassert(0, "Invalid number of arguments (" ~ Args.length.text ~ "): Should be an input range, "2982 ~ functions.length.text ~ " optional seed(s), and an optional work unit size.");
2983 }
2984 }
2985 }
2986 2987 // This test is not included in the documentation because even though these2988 // examples are for the inner fold() template, with their current location,2989 // they would appear under the outer one. (We can't move this inside the2990 // outer fold() template because then dmd runs out of memory possibly due to2991 // recursive template instantiation, which is surprisingly not caught.)2992 @systemunittest2993 {
2994 // Just the range2995 autox = taskPool.fold!"a + b"([1, 2, 3, 4]);
2996 assert(x == 10);
2997 2998 // The range and the seeds (0 and 1 below; also note multiple2999 // functions in this example)3000 autoy = taskPool.fold!("a + b", "a * b")([1, 2, 3, 4], 0, 1);
3001 assert(y[0] == 10);
3002 assert(y[1] == 24);
3003 3004 // The range, the seed (0), and the work unit size (20)3005 autoz = taskPool.fold!"a + b"([1, 2, 3, 4], 0, 20);
3006 assert(z == 10);
3007 }
3008 3009 /**
3010 Gets the index of the current thread relative to this `TaskPool`. Any
3011 thread not in this pool will receive an index of 0. The worker threads in
3012 this pool receive unique indices of 1 through `this.size`.
3013 3014 This function is useful for maintaining worker-local resources.
3015 3016 Example:
3017 ---
3018 // Execute a loop that computes the greatest common
3019 // divisor of every number from 0 through 999 with
3020 // 42 in parallel. Write the results out to
3021 // a set of files, one for each thread. This allows
3022 // results to be written out without any synchronization.
3023 3024 import std.conv, std.range, std.numeric, std.stdio;
3025 3026 void main()
3027 {
3028 auto filesHandles = new File[taskPool.size + 1];
3029 scope(exit) {
3030 foreach (ref handle; fileHandles)
3031 {
3032 handle.close();
3033 }
3034 }
3035 3036 foreach (i, ref handle; fileHandles)
3037 {
3038 handle = File("workerResults" ~ to!string(i) ~ ".txt");
3039 }
3040 3041 foreach (num; parallel(iota(1_000)))
3042 {
3043 auto outHandle = fileHandles[taskPool.workerIndex];
3044 outHandle.writeln(num, '\t', gcd(num, 42));
3045 }
3046 }
3047 ---
3048 */3049 size_tworkerIndex() @property @safeconstnothrow3050 {
3051 immutablerawInd = threadIndex;
3052 return (rawInd >= instanceStartIndex && rawInd < instanceStartIndex + size) ?
3053 (rawInd - instanceStartIndex + 1) : 0;
3054 }
3055 3056 /**
3057 Struct for creating worker-local storage. Worker-local storage is
3058 thread-local storage that exists only for worker threads in a given
3059 `TaskPool` plus a single thread outside the pool. It is allocated on the
3060 garbage collected heap in a way that avoids _false sharing, and doesn't
3061 necessarily have global scope within any thread. It can be accessed from
3062 any worker thread in the `TaskPool` that created it, and one thread
3063 outside this `TaskPool`. All threads outside the pool that created a
3064 given instance of worker-local storage share a single slot.
3065 3066 Since the underlying data for this struct is heap-allocated, this struct
3067 has reference semantics when passed between functions.
3068 3069 The main uses cases for `WorkerLocalStorage` are:
3070 3071 1. Performing parallel reductions with an imperative, as opposed to
3072 functional, programming style. In this case, it's useful to treat
3073 `WorkerLocalStorage` as local to each thread for only the parallel
3074 portion of an algorithm.
3075 3076 2. Recycling temporary buffers across iterations of a parallel foreach loop.
3077 3078 Example:
3079 ---
3080 // Calculate pi as in our synopsis example, but
3081 // use an imperative instead of a functional style.
3082 immutable n = 1_000_000_000;
3083 immutable delta = 1.0L / n;
3084 3085 auto sums = taskPool.workerLocalStorage(0.0L);
3086 foreach (i; parallel(iota(n)))
3087 {
3088 immutable x = ( i - 0.5L ) * delta;
3089 immutable toAdd = delta / ( 1.0 + x * x );
3090 sums.get += toAdd;
3091 }
3092 3093 // Add up the results from each worker thread.
3094 real pi = 0;
3095 foreach (threadResult; sums.toRange)
3096 {
3097 pi += 4.0L * threadResult;
3098 }
3099 ---
3100 */3101 staticstructWorkerLocalStorage(T)
3102 {
3103 private:
3104 TaskPoolpool;
3105 size_tsize;
3106 3107 size_telemSize;
3108 bool* stillThreadLocal;
3109 3110 staticsize_troundToLine(size_tnum) purenothrow3111 {
3112 if (num % cacheLineSize == 0)
3113 {
3114 returnnum;
3115 }
3116 else3117 {
3118 return ((num / cacheLineSize) + 1) * cacheLineSize;
3119 }
3120 }
3121 3122 void* data;
3123 3124 voidinitialize(TaskPoolpool)
3125 {
3126 this.pool = pool;
3127 size = pool.size + 1;
3128 stillThreadLocal = newbool;
3129 *stillThreadLocal = true;
3130 3131 // Determines whether the GC should scan the array.3132 autoblkInfo = (typeid(T).flags & 1) ?
3133 cast(GC.BlkAttr) 0 :
3134 GC.BlkAttr.NO_SCAN;
3135 3136 immutablenElem = pool.size + 1;
3137 elemSize = roundToLine(T.sizeof);
3138 3139 // The + 3 is to pad one full cache line worth of space on either side3140 // of the data structure to make sure false sharing with completely3141 // unrelated heap data is prevented, and to provide enough padding to3142 // make sure that data is cache line-aligned.3143 data = GC.malloc(elemSize * (nElem + 3), blkInfo) + elemSize;
3144 3145 // Cache line align data ptr.3146 data = cast(void*) roundToLine(cast(size_t) data);
3147 3148 foreach (i; 0 .. nElem)
3149 {
3150 this.opIndex(i) = T.init;
3151 }
3152 }
3153 3154 refopIndex(thisQualified)(size_tindex)
3155 {
3156 importstd.conv : text;
3157 assert(index < size, text(index, '\t', uint.max));
3158 return *(cast(CopyTypeQualifiers!(Qualified, T)*) (data + elemSize * index));
3159 }
3160 3161 voidopIndexAssign(Tval, size_tindex)
3162 {
3163 assert(index < size);
3164 *(cast(T*) (data + elemSize * index)) = val;
3165 }
3166 3167 public:
3168 /**
3169 Get the current thread's instance. Returns by ref.
3170 Note that calling `get` from any thread
3171 outside the `TaskPool` that created this instance will return the
3172 same reference, so an instance of worker-local storage should only be
3173 accessed from one thread outside the pool that created it. If this
3174 rule is violated, undefined behavior will result.
3175 3176 If assertions are enabled and `toRange` has been called, then this
3177 WorkerLocalStorage instance is no longer worker-local and an assertion
3178 failure will result when calling this method. This is not checked
3179 when assertions are disabled for performance reasons.
3180 */3181 refget(thisQualified)() @property3182 {
3183 assert(*stillThreadLocal,
3184 "Cannot call get() on this instance of WorkerLocalStorage " ~
3185 "because it is no longer worker-local."3186 );
3187 returnopIndex(pool.workerIndex);
3188 }
3189 3190 /**
3191 Assign a value to the current thread's instance. This function has
3192 the same caveats as its overload.
3193 */3194 voidget(Tval) @property3195 {
3196 assert(*stillThreadLocal,
3197 "Cannot call get() on this instance of WorkerLocalStorage " ~
3198 "because it is no longer worker-local."3199 );
3200 3201 opIndexAssign(val, pool.workerIndex);
3202 }
3203 3204 /**
3205 Returns a range view of the values for all threads, which can be used
3206 to further process the results of each thread after running the parallel
3207 part of your algorithm. Do not use this method in the parallel portion
3208 of your algorithm.
3209 3210 Calling this function sets a flag indicating that this struct is no
3211 longer worker-local, and attempting to use the `get` method again
3212 will result in an assertion failure if assertions are enabled.
3213 */3214 WorkerLocalStorageRange!TtoRange() @property3215 {
3216 if (*stillThreadLocal)
3217 {
3218 *stillThreadLocal = false;
3219 3220 // Make absolutely sure results are visible to all threads.3221 // This is probably not necessary since some other3222 // synchronization primitive will be used to signal that the3223 // parallel part of the algorithm is done, but the3224 // performance impact should be negligible, so it's better3225 // to be safe.3226 ubytebarrierDummy;
3227 atomicSetUbyte(barrierDummy, 1);
3228 }
3229 3230 returnWorkerLocalStorageRange!T(this);
3231 }
3232 }
3233 3234 /**
3235 Range primitives for worker-local storage. The purpose of this is to
3236 access results produced by each worker thread from a single thread once you
3237 are no longer using the worker-local storage from multiple threads.
3238 Do not use this struct in the parallel portion of your algorithm.
3239 3240 The proper way to instantiate this object is to call
3241 `WorkerLocalStorage.toRange`. Once instantiated, this object behaves
3242 as a finite random-access range with assignable, lvalue elements and
3243 a length equal to the number of worker threads in the `TaskPool` that
3244 created it plus 1.
3245 */3246 staticstructWorkerLocalStorageRange(T)
3247 {
3248 private:
3249 WorkerLocalStorage!TworkerLocalStorage;
3250 3251 size_t_length;
3252 size_tbeginOffset;
3253 3254 this(WorkerLocalStorage!Twl)
3255 {
3256 this.workerLocalStorage = wl;
3257 _length = wl.size;
3258 }
3259 3260 public:
3261 reffront(thisQualified)() @property3262 {
3263 returnthis[0];
3264 }
3265 3266 refback(thisQualified)() @property3267 {
3268 returnthis[_length - 1];
3269 }
3270 3271 voidpopFront()
3272 {
3273 if (_length > 0)
3274 {
3275 beginOffset++;
3276 _length--;
3277 }
3278 }
3279 3280 voidpopBack()
3281 {
3282 if (_length > 0)
3283 {
3284 _length--;
3285 }
3286 }
3287 3288 typeof(this) save() @property3289 {
3290 returnthis;
3291 }
3292 3293 refopIndex(thisQualified)(size_tindex)
3294 {
3295 assert(index < _length);
3296 returnworkerLocalStorage[index + beginOffset];
3297 }
3298 3299 voidopIndexAssign(Tval, size_tindex)
3300 {
3301 assert(index < _length);
3302 workerLocalStorage[index] = val;
3303 }
3304 3305 typeof(this) opSlice(size_tlower, size_tupper)
3306 {
3307 assert(upper <= _length);
3308 autonewWl = this.workerLocalStorage;
3309 newWl.data += lower * newWl.elemSize;
3310 newWl.size = upper - lower;
3311 returntypeof(this)(newWl);
3312 }
3313 3314 boolempty() const @property3315 {
3316 returnlength == 0;
3317 }
3318 3319 size_tlength() const @property3320 {
3321 return_length;
3322 }
3323 }
3324 3325 /**
3326 Creates an instance of worker-local storage, initialized with a given
3327 value. The value is `lazy` so that you can, for example, easily
3328 create one instance of a class for each worker. For usage example,
3329 see the `WorkerLocalStorage` struct.
3330 */3331 WorkerLocalStorage!TworkerLocalStorage(T)(lazyTinitialVal = T.init)
3332 {
3333 WorkerLocalStorage!Tret;
3334 ret.initialize(this);
3335 foreach (i; 0 .. size + 1)
3336 {
3337 ret[i] = initialVal;
3338 }
3339 3340 // Memory barrier to make absolutely sure that what we wrote is3341 // visible to worker threads.3342 ubytebarrierDummy;
3343 atomicSetUbyte(barrierDummy, 0);
3344 3345 returnret;
3346 }
3347 3348 /**
3349 Signals to all worker threads to terminate as soon as they are finished
3350 with their current `Task`, or immediately if they are not executing a
3351 `Task`. `Task`s that were in queue will not be executed unless
3352 a call to `Task.workForce`, `Task.yieldForce` or `Task.spinForce`
3353 causes them to be executed.
3354 3355 Use only if you have waited on every `Task` and therefore know the
3356 queue is empty, or if you speculatively executed some tasks and no longer
3357 need the results.
3358 */3359 voidstop() @trusted3360 {
3361 queueLock();
3362 scope(exit) queueUnlock();
3363 atomicSetUbyte(status, PoolState.stopNow);
3364 notifyAll();
3365 }
3366 3367 /**
3368 Signals worker threads to terminate when the queue becomes empty.
3369 3370 If blocking argument is true, wait for all worker threads to terminate
3371 before returning. This option might be used in applications where
3372 task results are never consumed-- e.g. when `TaskPool` is employed as a
3373 rudimentary scheduler for tasks which communicate by means other than
3374 return values.
3375 3376 Warning: Calling this function with $(D blocking = true) from a worker
3377 thread that is a member of the same `TaskPool` that
3378 `finish` is being called on will result in a deadlock.
3379 */3380 voidfinish(boolblocking = false) @trusted3381 {
3382 {
3383 queueLock();
3384 scope(exit) queueUnlock();
3385 atomicCasUbyte(status, PoolState.running, PoolState.finishing);
3386 notifyAll();
3387 }
3388 if (blocking)
3389 {
3390 // Use this thread as a worker until everything is finished.3391 executeWorkLoop();
3392 3393 foreach (t; pool)
3394 {
3395 // Maybe there should be something here to prevent a thread3396 // from calling join() on itself if this function is called3397 // from a worker thread in the same pool, but:3398 //3399 // 1. Using an if statement to skip join() would result in3400 // finish() returning without all tasks being finished.3401 //3402 // 2. If an exception were thrown, it would bubble up to the3403 // Task from which finish() was called and likely be3404 // swallowed.3405 t.join();
3406 }
3407 }
3408 }
3409 3410 /// Returns the number of worker threads in the pool.3411 @propertysize_tsize() @safeconstpurenothrow3412 {
3413 returnpool.length;
3414 }
3415 3416 /**
3417 Put a `Task` object on the back of the task queue. The `Task`
3418 object may be passed by pointer or reference.
3419 3420 Example:
3421 ---
3422 import std.file;
3423 3424 // Create a task.
3425 auto t = task!read("foo.txt");
3426 3427 // Add it to the queue to be executed.
3428 taskPool.put(t);
3429 ---
3430 3431 Notes:
3432 3433 @trusted overloads of this function are called for `Task`s if
3434 $(REF hasUnsharedAliasing, std,traits) is false for the `Task`'s
3435 return type or the function the `Task` executes is `pure`.
3436 `Task` objects that meet all other requirements specified in the
3437 `@trusted` overloads of `task` and `scopedTask` may be created
3438 and executed from `@safe` code via `Task.executeInNewThread` but
3439 not via `TaskPool`.
3440 3441 While this function takes the address of variables that may
3442 be on the stack, some overloads are marked as @trusted.
3443 `Task` includes a destructor that waits for the task to complete
3444 before destroying the stack frame it is allocated on. Therefore,
3445 it is impossible for the stack frame to be destroyed before the task is
3446 complete and no longer referenced by a `TaskPool`.
3447 */3448 voidput(aliasfun, Args...)(refTask!(fun, Args) task)
3449 if (!isSafeReturn!(typeof(task)))
3450 {
3451 task.pool = this;
3452 abstractPut(task.basePtr);
3453 }
3454 3455 /// Ditto3456 voidput(aliasfun, Args...)(Task!(fun, Args)* task)
3457 if (!isSafeReturn!(typeof(*task)))
3458 {
3459 importstd.exception : enforce;
3460 enforce(task !isnull, "Cannot put a null Task on a TaskPool queue.");
3461 put(*task);
3462 }
3463 3464 @trustedvoidput(aliasfun, Args...)(refTask!(fun, Args) task)
3465 if (isSafeReturn!(typeof(task)))
3466 {
3467 task.pool = this;
3468 abstractPut(task.basePtr);
3469 }
3470 3471 @trustedvoidput(aliasfun, Args...)(Task!(fun, Args)* task)
3472 if (isSafeReturn!(typeof(*task)))
3473 {
3474 importstd.exception : enforce;
3475 enforce(task !isnull, "Cannot put a null Task on a TaskPool queue.");
3476 put(*task);
3477 }
3478 3479 /**
3480 These properties control whether the worker threads are daemon threads.
3481 A daemon thread is automatically terminated when all non-daemon threads
3482 have terminated. A non-daemon thread will prevent a program from
3483 terminating as long as it has not terminated.
3484 3485 If any `TaskPool` with non-daemon threads is active, either `stop`
3486 or `finish` must be called on it before the program can terminate.
3487 3488 The worker treads in the `TaskPool` instance returned by the
3489 `taskPool` property are daemon by default. The worker threads of
3490 manually instantiated task pools are non-daemon by default.
3491 3492 Note: For a size zero pool, the getter arbitrarily returns true and the
3493 setter has no effect.
3494 */3495 boolisDaemon() @property @trusted3496 {
3497 queueLock();
3498 scope(exit) queueUnlock();
3499 return (size == 0) ? true : pool[0].isDaemon;
3500 }
3501 3502 /// Ditto3503 voidisDaemon(boolnewVal) @property @trusted3504 {
3505 queueLock();
3506 scope(exit) queueUnlock();
3507 foreach (thread; pool)
3508 {
3509 thread.isDaemon = newVal;
3510 }
3511 }
3512 3513 /**
3514 These functions allow getting and setting the OS scheduling priority of
3515 the worker threads in this `TaskPool`. They forward to
3516 `core.thread.Thread.priority`, so a given priority value here means the
3517 same thing as an identical priority value in `core.thread`.
3518 3519 Note: For a size zero pool, the getter arbitrarily returns
3520 `core.thread.Thread.PRIORITY_MIN` and the setter has no effect.
3521 */3522 intpriority() @property @trusted3523 {
3524 return (size == 0) ? core.thread.Thread.PRIORITY_MIN :
3525 pool[0].priority;
3526 }
3527 3528 /// Ditto3529 voidpriority(intnewPriority) @property @trusted3530 {
3531 if (size > 0)
3532 {
3533 foreach (t; pool)
3534 {
3535 t.priority = newPriority;
3536 }
3537 }
3538 }
3539 }
3540 3541 @systemunittest3542 {
3543 importstd.algorithm.iteration : sum;
3544 importstd.range : iota;
3545 importstd.typecons : tuple;
3546 3547 enumN = 100;
3548 autor = iota(1, N + 1);
3549 constexpected = r.sum();
3550 3551 // Just the range3552 assert(taskPool.fold!"a + b"(r) == expected);
3553 3554 // Range and seeds3555 assert(taskPool.fold!"a + b"(r, 0) == expected);
3556 assert(taskPool.fold!("a + b", "a + b")(r, 0, 0) == tuple(expected, expected));
3557 3558 // Range, seeds, and work unit size3559 assert(taskPool.fold!"a + b"(r, 0, 42) == expected);
3560 assert(taskPool.fold!("a + b", "a + b")(r, 0, 0, 42) == tuple(expected, expected));
3561 }
3562 3563 // Issue 167053564 @systemunittest3565 {
3566 structMyIota3567 {
3568 size_tfront;
3569 voidpopFront()(){front++;}
3570 autoempty(){returnfront >= 25;}
3571 autoopIndex(size_ti){returnfront+i;}
3572 autolength(){return25-front;}
3573 }
3574 3575 automySum = taskPool.reduce!"a + b"(MyIota());
3576 }
3577 3578 /**
3579 Returns a lazily initialized global instantiation of `TaskPool`.
3580 This function can safely be called concurrently from multiple non-worker
3581 threads. The worker threads in this pool are daemon threads, meaning that it
3582 is not necessary to call `TaskPool.stop` or `TaskPool.finish` before
3583 terminating the main thread.
3584 */3585 @propertyTaskPooltaskPool() @trusted3586 {
3587 importstd.concurrency : initOnce;
3588 __gsharedTaskPoolpool;
3589 returninitOnce!pool({
3590 autop = newTaskPool(defaultPoolThreads);
3591 p.isDaemon = true;
3592 returnp;
3593 }());
3594 }
3595 3596 privateshareduint_defaultPoolThreads = uint.max;
3597 3598 /**
3599 These properties get and set the number of worker threads in the `TaskPool`
3600 instance returned by `taskPool`. The default value is `totalCPUs` - 1.
3601 Calling the setter after the first call to `taskPool` does not changes
3602 number of worker threads in the instance returned by `taskPool`.
3603 */3604 @propertyuintdefaultPoolThreads() @trusted3605 {
3606 constlocal = atomicLoad(_defaultPoolThreads);
3607 returnlocal < uint.max ? local : totalCPUs - 1;
3608 }
3609 3610 /// Ditto3611 @propertyvoiddefaultPoolThreads(uintnewVal) @trusted3612 {
3613 atomicStore(_defaultPoolThreads, newVal);
3614 }
3615 3616 /**
3617 Convenience functions that forwards to `taskPool.parallel`. The
3618 purpose of these is to make parallel foreach less verbose and more
3619 readable.
3620 3621 Example:
3622 ---
3623 // Find the logarithm of every number from
3624 // 1 to 1_000_000 in parallel, using the
3625 // default TaskPool instance.
3626 auto logs = new double[1_000_000];
3627 3628 foreach (i, ref elem; parallel(logs))
3629 {
3630 elem = log(i + 1.0);
3631 }
3632 ---
3633 3634 */3635 ParallelForeach!Rparallel(R)(Rrange)
3636 {
3637 returntaskPool.parallel(range);
3638 }
3639 3640 /// Ditto3641 ParallelForeach!Rparallel(R)(Rrange, size_tworkUnitSize)
3642 {
3643 returntaskPool.parallel(range, workUnitSize);
3644 }
3645 3646 // `each` should be usable with parallel3647 // https://issues.dlang.org/show_bug.cgi?id=170193648 @systemunittest3649 {
3650 importstd.algorithm.iteration : each, sum;
3651 importstd.range : iota;
3652 3653 // check behavior with parallel3654 autoarr = newint[10];
3655 parallel(arr).each!((refe) => e += 1);
3656 assert(arr.sum == 10);
3657 3658 autoarrIndex = newint[10];
3659 parallel(arrIndex).each!((i, refe) => e += i);
3660 assert(arrIndex.sum == 10.iota.sum);
3661 }
3662 3663 // https://issues.dlang.org/show_bug.cgi?id=227453664 @systemunittest3665 {
3666 autopool = newTaskPool(0);
3667 int[] empty;
3668 foreach (i; pool.parallel(empty)) {}
3669 pool.finish();
3670 }
3671 3672 // Thrown when a parallel foreach loop is broken from.3673 classParallelForeachError : Error3674 {
3675 this()
3676 {
3677 super("Cannot break from a parallel foreach loop using break, return, "3678 ~ "labeled break/continue or goto statements.");
3679 }
3680 }
3681 3682 /*------Structs that implement opApply for parallel foreach.------------------*/3683 privatetemplaterandLen(R)
3684 {
3685 enumrandLen = isRandomAccessRange!R && hasLength!R;
3686 }
3687 3688 privatevoidsubmitAndExecute(
3689 TaskPoolpool,
3690 scopevoiddelegate() doIt3691 )
3692 {
3693 importcore.exception : OutOfMemoryError;
3694 immutablenThreads = pool.size + 1;
3695 3696 aliasPTask = typeof(scopedTask(doIt));
3697 importcore.stdc.stdlib : malloc, free;
3698 importcore.stdc.string : memcpy;
3699 3700 // The logical thing to do would be to just use alloca() here, but that3701 // causes problems on Windows for reasons that I don't understand3702 // (tentatively a compiler bug) and definitely doesn't work on Posix due3703 // to https://issues.dlang.org/show_bug.cgi?id=3753.3704 // Therefore, allocate a fixed buffer and fall back to `malloc()` if3705 // someone's using a ridiculous amount of threads.3706 // Also, the using a byte array instead of a PTask array as the fixed buffer3707 // is to prevent d'tors from being called on uninitialized excess PTask3708 // instances.3709 enumnBuf = 64;
3710 byte[nBuf * PTask.sizeof] buf = void;
3711 PTask[] tasks;
3712 if (nThreads <= nBuf)
3713 {
3714 tasks = (cast(PTask*) buf.ptr)[0 .. nThreads];
3715 }
3716 else3717 {
3718 autoptr = cast(PTask*) malloc(nThreads * PTask.sizeof);
3719 if (!ptr) thrownewOutOfMemoryError("Out of memory in std.parallelism.");
3720 tasks = ptr[0 .. nThreads];
3721 }
3722 3723 scope(exit)
3724 {
3725 if (nThreads > nBuf)
3726 {
3727 free(tasks.ptr);
3728 }
3729 }
3730 3731 foreach (reft; tasks)
3732 {
3733 importcore.stdc.string : memcpy;
3734 3735 // This silly looking code is necessary to prevent d'tors from being3736 // called on uninitialized objects.3737 autotemp = scopedTask(doIt);
3738 memcpy(&t, &temp, PTask.sizeof);
3739 3740 // This has to be done to t after copying, not temp before copying.3741 // Otherwise, temp's destructor will sit here and wait for the3742 // task to finish.3743 t.pool = pool;
3744 }
3745 3746 foreach (i; 1 .. tasks.length - 1)
3747 {
3748 tasks[i].next = tasks[i + 1].basePtr;
3749 tasks[i + 1].prev = tasks[i].basePtr;
3750 }
3751 3752 if (tasks.length > 1)
3753 {
3754 pool.queueLock();
3755 scope(exit) pool.queueUnlock();
3756 3757 pool.abstractPutGroupNoSync(
3758 tasks[1].basePtr,
3759 tasks[$ - 1].basePtr3760 );
3761 }
3762 3763 if (tasks.length > 0)
3764 {
3765 try3766 {
3767 tasks[0].job();
3768 }
3769 catch (Throwablee)
3770 {
3771 tasks[0].exception = e; // nocoverage3772 }
3773 tasks[0].taskStatus = TaskStatus.done;
3774 3775 // Try to execute each of these in the current thread3776 foreach (reftask; tasks[1..$])
3777 {
3778 pool.tryDeleteExecute(task.basePtr);
3779 }
3780 }
3781 3782 ThrowablefirstException;
3783 3784 foreach (i, reftask; tasks)
3785 {
3786 try3787 {
3788 task.yieldForce;
3789 }
3790 catch (Throwablee)
3791 {
3792 /* Chain e to front because order doesn't matter and because
3793 * e is not likely to be a chain itself (so fewer traversals)
3794 */3795 firstException = Throwable.chainTogether(e, firstException);
3796 continue;
3797 }
3798 }
3799 3800 if (firstException) throwfirstException;
3801 }
3802 3803 voidforeachErr()
3804 {
3805 thrownewParallelForeachError();
3806 }
3807 3808 intdoSizeZeroCase(R, Delegate)(refParallelForeach!Rp, Delegatedg)
3809 {
3810 with(p)
3811 {
3812 intres = 0;
3813 size_tindex = 0;
3814 3815 // The explicit ElementType!R in the foreach loops is necessary for3816 // correct behavior when iterating over strings.3817 staticif (hasLvalueElements!R)
3818 {
3819 foreach (refElementType!Relem; range)
3820 {
3821 staticif (Parameters!dg.length == 2)
3822 {
3823 res = dg(index, elem);
3824 }
3825 else3826 {
3827 res = dg(elem);
3828 }
3829 if (res) break;
3830 index++;
3831 }
3832 }
3833 else3834 {
3835 foreach (ElementType!Relem; range)
3836 {
3837 staticif (Parameters!dg.length == 2)
3838 {
3839 res = dg(index, elem);
3840 }
3841 else3842 {
3843 res = dg(elem);
3844 }
3845 if (res) break;
3846 index++;
3847 }
3848 }
3849 if (res) foreachErr;
3850 returnres;
3851 }
3852 }
3853 3854 privateenumstringparallelApplyMixinRandomAccess = q{3855 // Handle empty thread pool as special case.3856 if (pool.size == 0)
3857 {
3858 returndoSizeZeroCase(this, dg);
3859 }
3860 3861 // Whether iteration is with or without an index variable.3862 enumwithIndex = Parameters!(typeof(dg)).length == 2;
3863 3864 sharedsize_tworkUnitIndex = size_t.max; // Effectively -1: chunkIndex + 1 == 03865 immutablelen = range.length;
3866 if (!len) return0;
3867 3868 sharedboolshouldContinue = true;
3869 3870 voiddoIt()
3871 {
3872 importstd.algorithm.comparison : min;
3873 3874 scope(failure)
3875 {
3876 // If an exception is thrown, all threads should bail.3877 atomicStore(shouldContinue, false);
3878 }
3879 3880 while (atomicLoad(shouldContinue))
3881 {
3882 immutablemyUnitIndex = atomicOp!"+="(workUnitIndex, 1);
3883 immutablestart = workUnitSize * myUnitIndex;
3884 if (start >= len)
3885 {
3886 atomicStore(shouldContinue, false);
3887 break;
3888 }
3889 3890 immutableend = min(len, start + workUnitSize);
3891 3892 foreach (i; start .. end)
3893 {
3894 staticif (withIndex)
3895 {
3896 if (dg(i, range[i])) foreachErr();
3897 }
3898 else3899 {
3900 if (dg(range[i])) foreachErr();
3901 }
3902 }
3903 }
3904 }
3905 3906 submitAndExecute(pool, &doIt);
3907 3908 return0;
3909 };
3910 3911 enumstringparallelApplyMixinInputRange = q{3912 // Handle empty thread pool as special case.3913 if (pool.size == 0)
3914 {
3915 returndoSizeZeroCase(this, dg);
3916 }
3917 3918 // Whether iteration is with or without an index variable.3919 enumwithIndex = Parameters!(typeof(dg)).length == 2;
3920 3921 // This protects the range while copying it.3922 autorangeMutex = newMutex();
3923 3924 sharedboolshouldContinue = true;
3925 3926 // The total number of elements that have been popped off range.3927 // This is updated only while protected by rangeMutex;3928 size_tnPopped = 0;
3929 3930 staticif (
3931 is(typeof(range.buf1)) &&
3932 is(typeof(range.bufPos)) &&
3933 is(typeof(range.doBufSwap()))
3934 )
3935 {
3936 // Make sure we don't have the buffer recycling overload of3937 // asyncBuf.3938 staticif (
3939 is(typeof(range.source)) &&
3940 isRoundRobin!(typeof(range.source))
3941 )
3942 {
3943 staticassert(0, "Cannot execute a parallel foreach loop on " ~
3944 "the buffer recycling overload of asyncBuf.");
3945 }
3946 3947 enumboolbufferTrick = true;
3948 }
3949 else3950 {
3951 enumboolbufferTrick = false;
3952 }
3953 3954 voiddoIt()
3955 {
3956 scope(failure)
3957 {
3958 // If an exception is thrown, all threads should bail.3959 atomicStore(shouldContinue, false);
3960 }
3961 3962 staticif (hasLvalueElements!R)
3963 {
3964 aliasTemp = ElementType!R*[];
3965 Temptemp;
3966 3967 // Returns: The previous value of nPopped.3968 size_tmakeTemp()
3969 {
3970 importstd.algorithm.internal : addressOf;
3971 importstd.array : uninitializedArray;
3972 3973 if (tempisnull)
3974 {
3975 temp = uninitializedArray!Temp(workUnitSize);
3976 }
3977 3978 rangeMutex.lock();
3979 scope(exit) rangeMutex.unlock();
3980 3981 size_ti = 0;
3982 for (; i < workUnitSize && !range.empty; range.popFront(), i++)
3983 {
3984 temp[i] = addressOf(range.front);
3985 }
3986 3987 temp = temp[0 .. i];
3988 autoret = nPopped;
3989 nPopped += temp.length;
3990 returnret;
3991 }
3992 3993 }
3994 else3995 {
3996 3997 aliasTemp = ElementType!R[];
3998 Temptemp;
3999 4000 // Returns: The previous value of nPopped.4001 staticif (!bufferTrick) size_tmakeTemp()
4002 {
4003 importstd.array : uninitializedArray;
4004 4005 if (tempisnull)
4006 {
4007 temp = uninitializedArray!Temp(workUnitSize);
4008 }
4009 4010 rangeMutex.lock();
4011 scope(exit) rangeMutex.unlock();
4012 4013 size_ti = 0;
4014 for (; i < workUnitSize && !range.empty; range.popFront(), i++)
4015 {
4016 temp[i] = range.front;
4017 }
4018 4019 temp = temp[0 .. i];
4020 autoret = nPopped;
4021 nPopped += temp.length;
4022 returnret;
4023 }
4024 4025 staticif (bufferTrick) size_tmakeTemp()
4026 {
4027 importstd.algorithm.mutation : swap;
4028 rangeMutex.lock();
4029 scope(exit) rangeMutex.unlock();
4030 4031 // Elide copying by just swapping buffers.4032 temp.length = range.buf1.length;
4033 swap(range.buf1, temp);
4034 4035 // This is necessary in case popFront() has been called on4036 // range before entering the parallel foreach loop.4037 temp = temp[range.bufPos..$];
4038 4039 staticif (is(typeof(range._length)))
4040 {
4041 range._length -= (temp.length - range.bufPos);
4042 }
4043 4044 range.doBufSwap();
4045 autoret = nPopped;
4046 nPopped += temp.length;
4047 returnret;
4048 }
4049 }
4050 4051 while (atomicLoad(shouldContinue))
4052 {
4053 autooverallIndex = makeTemp();
4054 if (temp.empty)
4055 {
4056 atomicStore(shouldContinue, false);
4057 break;
4058 }
4059 4060 foreach (i; 0 .. temp.length)
4061 {
4062 scope(success) overallIndex++;
4063 4064 staticif (hasLvalueElements!R)
4065 {
4066 staticif (withIndex)
4067 {
4068 if (dg(overallIndex, *temp[i])) foreachErr();
4069 }
4070 else4071 {
4072 if (dg(*temp[i])) foreachErr();
4073 }
4074 }
4075 else4076 {
4077 staticif (withIndex)
4078 {
4079 if (dg(overallIndex, temp[i])) foreachErr();
4080 }
4081 else4082 {
4083 if (dg(temp[i])) foreachErr();
4084 }
4085 }
4086 }
4087 }
4088 }
4089 4090 submitAndExecute(pool, &doIt);
4091 4092 return0;
4093 };
4094 4095 4096 privatestructParallelForeach(R)
4097 {
4098 TaskPoolpool;
4099 Rrange;
4100 size_tworkUnitSize;
4101 aliasE = ElementType!R;
4102 4103 staticif (hasLvalueElements!R)
4104 {
4105 aliasNoIndexDg = intdelegate(refE);
4106 aliasIndexDg = intdelegate(size_t, refE);
4107 }
4108 else4109 {
4110 aliasNoIndexDg = intdelegate(E);
4111 aliasIndexDg = intdelegate(size_t, E);
4112 }
4113 4114 intopApply(scopeNoIndexDgdg)
4115 {
4116 staticif (randLen!R)
4117 {
4118 mixin(parallelApplyMixinRandomAccess);
4119 }
4120 else4121 {
4122 mixin(parallelApplyMixinInputRange);
4123 }
4124 }
4125 4126 intopApply(scopeIndexDgdg)
4127 {
4128 staticif (randLen!R)
4129 {
4130 mixin(parallelApplyMixinRandomAccess);
4131 }
4132 else4133 {
4134 mixin(parallelApplyMixinInputRange);
4135 }
4136 }
4137 }
4138 4139 /*
4140 This struct buffers the output of a callable that outputs data into a
4141 user-supplied buffer into a set of buffers of some fixed size. It allows these
4142 buffers to be accessed with an input range interface. This is used internally
4143 in the buffer-recycling overload of TaskPool.asyncBuf, which creates an
4144 instance and forwards it to the input range overload of asyncBuf.
4145 */4146 privatestructRoundRobinBuffer(C1, C2)
4147 {
4148 // No need for constraints because they're already checked for in asyncBuf.4149 4150 aliasArray = Parameters!(C1.init)[0];
4151 aliasT = typeof(Array.init[0]);
4152 4153 T[][] bufs;
4154 size_tindex;
4155 C1nextDel;
4156 C2emptyDel;
4157 bool_empty;
4158 boolprimed;
4159 4160 this(
4161 C1nextDel,
4162 C2emptyDel,
4163 size_tinitialBufSize,
4164 size_tnBuffers4165 ) {
4166 this.nextDel = nextDel;
4167 this.emptyDel = emptyDel;
4168 bufs.length = nBuffers;
4169 4170 foreach (refbuf; bufs)
4171 {
4172 buf.length = initialBufSize;
4173 }
4174 }
4175 4176 voidprime()
4177 in4178 {
4179 assert(!empty);
4180 }
4181 do4182 {
4183 scope(success) primed = true;
4184 nextDel(bufs[index]);
4185 }
4186 4187 4188 T[] front() @property4189 in4190 {
4191 assert(!empty);
4192 }
4193 do4194 {
4195 if (!primed) prime();
4196 returnbufs[index];
4197 }
4198 4199 voidpopFront()
4200 {
4201 if (empty || emptyDel())
4202 {
4203 _empty = true;
4204 return;
4205 }
4206 4207 index = (index + 1) % bufs.length;
4208 primed = false;
4209 }
4210 4211 boolempty() @propertyconst @safepurenothrow4212 {
4213 return_empty;
4214 }
4215 }
4216 4217 version (StdUnittest)
4218 {
4219 // This was the only way I could get nested maps to work.4220 private__gsharedTaskPoolpoolInstance;
4221 }
4222 4223 // These test basic functionality but don't stress test for threading bugs.4224 // These are the tests that should be run every time Phobos is compiled.4225 @systemunittest4226 {
4227 importstd.algorithm.comparison : equal, min, max;
4228 importstd.algorithm.iteration : filter, map, reduce;
4229 importstd.array : split;
4230 importstd.conv : text;
4231 importstd.exception : assertThrown;
4232 importstd.math.operations : isClose;
4233 importstd.math.algebraic : sqrt, abs;
4234 importstd.math.exponential : log;
4235 importstd.range : indexed, iota, join;
4236 importstd.typecons : Tuple, tuple;
4237 importstd.stdio;
4238 4239 poolInstance = newTaskPool(2);
4240 scope(exit) poolInstance.stop();
4241 4242 // The only way this can be verified is manually.4243 debug(std_parallelism) stderr.writeln("totalCPUs = ", totalCPUs);
4244 4245 autooldPriority = poolInstance.priority;
4246 poolInstance.priority = Thread.PRIORITY_MAX;
4247 assert(poolInstance.priority == Thread.PRIORITY_MAX);
4248 4249 poolInstance.priority = Thread.PRIORITY_MIN;
4250 assert(poolInstance.priority == Thread.PRIORITY_MIN);
4251 4252 poolInstance.priority = oldPriority;
4253 assert(poolInstance.priority == oldPriority);
4254 4255 staticvoidrefFun(refuintnum)
4256 {
4257 num++;
4258 }
4259 4260 uintx;
4261 4262 // Test task().4263 autot = task!refFun(x);
4264 poolInstance.put(t);
4265 t.yieldForce;
4266 assert(t.args[0] == 1);
4267 4268 autot2 = task(&refFun, x);
4269 poolInstance.put(t2);
4270 t2.yieldForce;
4271 assert(t2.args[0] == 1);
4272 4273 // Test scopedTask().4274 autost = scopedTask!refFun(x);
4275 poolInstance.put(st);
4276 st.yieldForce;
4277 assert(st.args[0] == 1);
4278 4279 autost2 = scopedTask(&refFun, x);
4280 poolInstance.put(st2);
4281 st2.yieldForce;
4282 assert(st2.args[0] == 1);
4283 4284 // Test executeInNewThread().4285 autoct = scopedTask!refFun(x);
4286 ct.executeInNewThread(Thread.PRIORITY_MAX);
4287 ct.yieldForce;
4288 assert(ct.args[0] == 1);
4289 4290 // Test ref return.4291 uinttoInc = 0;
4292 staticrefTmakeRef(T)(refTnum)
4293 {
4294 returnnum;
4295 }
4296 4297 autot3 = task!makeRef(toInc);
4298 taskPool.put(t3);
4299 assert(t3.args[0] == 0);
4300 t3.spinForce++;
4301 assert(t3.args[0] == 1);
4302 4303 staticvoidtestSafe() @safe {
4304 staticintbump(intnum)
4305 {
4306 returnnum + 1;
4307 }
4308 4309 autosafePool = newTaskPool(0);
4310 autot = task(&bump, 1);
4311 taskPool.put(t);
4312 assert(t.yieldForce == 2);
4313 4314 autost = scopedTask(&bump, 1);
4315 taskPool.put(st);
4316 assert(st.yieldForce == 2);
4317 safePool.stop();
4318 }
4319 4320 autoarr = [1,2,3,4,5];
4321 autonums = newuint[5];
4322 autonums2 = newuint[5];
4323 4324 foreach (i, refelem; poolInstance.parallel(arr))
4325 {
4326 elem++;
4327 nums[i] = cast(uint) i + 2;
4328 nums2[i] = elem;
4329 }
4330 4331 assert(nums == [2,3,4,5,6], text(nums));
4332 assert(nums2 == nums, text(nums2));
4333 assert(arr == nums, text(arr));
4334 4335 // Test const/immutable arguments.4336 staticintadd(intlhs, intrhs)
4337 {
4338 returnlhs + rhs;
4339 }
4340 immutableaddLhs = 1;
4341 immutableaddRhs = 2;
4342 autoaddTask = task(&add, addLhs, addRhs);
4343 autoaddScopedTask = scopedTask(&add, addLhs, addRhs);
4344 poolInstance.put(addTask);
4345 poolInstance.put(addScopedTask);
4346 assert(addTask.yieldForce == 3);
4347 assert(addScopedTask.yieldForce == 3);
4348 4349 // Test parallel foreach with non-random access range.4350 autorange = filter!"a != 666"([0, 1, 2, 3, 4]);
4351 4352 foreach (i, elem; poolInstance.parallel(range))
4353 {
4354 nums[i] = cast(uint) i;
4355 }
4356 4357 assert(nums == [0,1,2,3,4]);
4358 4359 autologs = newdouble[1_000_000];
4360 foreach (i, refelem; poolInstance.parallel(logs))
4361 {
4362 elem = log(i + 1.0);
4363 }
4364 4365 foreach (i, elem; logs)
4366 {
4367 assert(isClose(elem, log(double(i + 1))));
4368 }
4369 4370 assert(poolInstance.amap!"a * a"([1,2,3,4,5]) == [1,4,9,16,25]);
4371 assert(poolInstance.amap!"a * a"([1,2,3,4,5], newlong[5]) == [1,4,9,16,25]);
4372 assert(poolInstance.amap!("a * a", "-a")([1,2,3]) ==
4373 [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4374 4375 autotupleBuf = newTuple!(int, int)[3];
4376 poolInstance.amap!("a * a", "-a")([1,2,3], tupleBuf);
4377 assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4378 poolInstance.amap!("a * a", "-a")([1,2,3], 5, tupleBuf);
4379 assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4380 4381 // Test amap with a non-array buffer.4382 autotoIndex = newint[5];
4383 autoind = indexed(toIndex, [3, 1, 4, 0, 2]);
4384 poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], ind);
4385 assert(equal(ind, [2, 4, 6, 8, 10]));
4386 assert(equal(toIndex, [8, 4, 10, 2, 6]));
4387 poolInstance.amap!"a / 2"(ind, ind);
4388 assert(equal(ind, [1, 2, 3, 4, 5]));
4389 assert(equal(toIndex, [4, 2, 5, 1, 3]));
4390 4391 autobuf = newint[5];
4392 poolInstance.amap!"a * a"([1,2,3,4,5], buf);
4393 assert(buf == [1,4,9,16,25]);
4394 poolInstance.amap!"a * a"([1,2,3,4,5], 4, buf);
4395 assert(buf == [1,4,9,16,25]);
4396 4397 assert(poolInstance.reduce!"a + b"([1]) == 1);
4398 assert(poolInstance.reduce!"a + b"([1,2,3,4]) == 10);
4399 assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4]) == 10);
4400 assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4], 1) == 10);
4401 assert(poolInstance.reduce!(min, max)([1,2,3,4]) == tuple(1, 4));
4402 assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) ==
4403 tuple(10, 24));
4404 4405 immutableserialAns = reduce!"a + b"(iota(1000));
4406 assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns);
4407 assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns);
4408 4409 // Test worker-local storage.4410 autowl = poolInstance.workerLocalStorage(0);
4411 foreach (i; poolInstance.parallel(iota(1000), 1))
4412 {
4413 wl.get = wl.get + i;
4414 }
4415 4416 autowlRange = wl.toRange;
4417 autoparallelSum = poolInstance.reduce!"a + b"(wlRange);
4418 assert(parallelSum == 499500);
4419 assert(wlRange[0 .. 1][0] == wlRange[0]);
4420 assert(wlRange[1 .. 2][0] == wlRange[1]);
4421 4422 // Test finish()4423 {
4424 staticvoidslowFun() { Thread.sleep(dur!"msecs"(1)); }
4425 4426 autopool1 = newTaskPool();
4427 autotSlow = task!slowFun();
4428 pool1.put(tSlow);
4429 pool1.finish();
4430 tSlow.yieldForce;
4431 // Can't assert that pool1.status == PoolState.stopNow because status4432 // doesn't change until after the "done" flag is set and the waiting4433 // thread is woken up.4434 4435 autopool2 = newTaskPool();
4436 autotSlow2 = task!slowFun();
4437 pool2.put(tSlow2);
4438 pool2.finish(true); // blocking4439 assert(tSlow2.done);
4440 4441 // Test fix for https://issues.dlang.org/show_bug.cgi?id=8582 by making pool size zero.4442 autopool3 = newTaskPool(0);
4443 autotSlow3 = task!slowFun();
4444 pool3.put(tSlow3);
4445 pool3.finish(true); // blocking4446 assert(tSlow3.done);
4447 4448 // This is correct because no thread will terminate unless pool2.status4449 // and pool3.status have already been set to stopNow.4450 assert(pool2.status == TaskPool.PoolState.stopNow);
4451 assert(pool3.status == TaskPool.PoolState.stopNow);
4452 }
4453 4454 // Test default pool stuff.4455 assert(taskPool.size == totalCPUs - 1);
4456 4457 nums = newuint[1000];
4458 foreach (i; parallel(iota(1000)))
4459 {
4460 nums[i] = cast(uint) i;
4461 }
4462 assert(equal(nums, iota(1000)));
4463 4464 assert(equal(
4465 poolInstance.map!"a * a"(iota(3_000_001), 10_000),
4466 map!"a * a"(iota(3_000_001))
4467 ));
4468 4469 // The filter is to kill random access and test the non-random access4470 // branch.4471 assert(equal(
4472 poolInstance.map!"a * a"(
4473 filter!"a == a"(iota(3_000_001)
4474 ), 10_000, 1000),
4475 map!"a * a"(iota(3_000_001))
4476 ));
4477 4478 assert(
4479 reduce!"a + b"(0UL,
4480 poolInstance.map!"a * a"(iota(300_001), 10_000)
4481 ) ==
4482 reduce!"a + b"(0UL,
4483 map!"a * a"(iota(300_001))
4484 )
4485 );
4486 4487 assert(equal(
4488 iota(1_000_002),
4489 poolInstance.asyncBuf(filter!"a == a"(iota(1_000_002)))
4490 ));
4491 4492 {
4493 importstd.conv : to;
4494 importstd.file : deleteme;
4495 4496 stringtemp_file = deleteme ~ "-tempDelMe.txt";
4497 autofile = File(temp_file, "wb");
4498 scope(exit)
4499 {
4500 file.close();
4501 importstd.file;
4502 remove(temp_file);
4503 }
4504 4505 autowritten = [[1.0, 2, 3], [4.0, 5, 6], [7.0, 8, 9]];
4506 foreach (row; written)
4507 {
4508 file.writeln(join(to!(string[])(row), "\t"));
4509 }
4510 4511 file = File(temp_file);
4512 4513 voidnext(refchar[] buf)
4514 {
4515 file.readln(buf);
4516 importstd.string : chomp;
4517 buf = chomp(buf);
4518 }
4519 4520 double[][] read;
4521 autoasyncReader = taskPool.asyncBuf(&next, &file.eof);
4522 4523 foreach (line; asyncReader)
4524 {
4525 if (line.length == 0) continue;
4526 autols = line.split("\t");
4527 read ~= to!(double[])(ls);
4528 }
4529 4530 assert(read == written);
4531 file.close();
4532 }
4533 4534 // Test Map/AsyncBuf chaining.4535 4536 autoabuf = poolInstance.asyncBuf(iota(-1.0, 3_000_000), 100);
4537 autotemp = poolInstance.map!sqrt(
4538 abuf, 100, 54539 );
4540 autolmchain = poolInstance.map!"a * a"(temp, 100, 5);
4541 lmchain.popFront();
4542 4543 intii;
4544 foreach ( elem; (lmchain))
4545 {
4546 if (!isClose(elem, ii))
4547 {
4548 stderr.writeln(ii, '\t', elem);
4549 }
4550 ii++;
4551 }
4552 4553 // Test buffer trick in parallel foreach.4554 abuf = poolInstance.asyncBuf(iota(-1.0, 1_000_000), 100);
4555 abuf.popFront();
4556 autobufTrickTest = newsize_t[abuf.length];
4557 foreach (i, elem; parallel(abuf))
4558 {
4559 bufTrickTest[i] = i;
4560 }
4561 4562 assert(equal(iota(1_000_000), bufTrickTest));
4563 4564 automyTask = task!(abs)(-1);
4565 taskPool.put(myTask);
4566 assert(myTask.spinForce == 1);
4567 4568 // Test that worker local storage from one pool receives an index of 04569 // when the index is queried w.r.t. another pool. The only way to do this4570 // is non-deterministically.4571 foreach (i; parallel(iota(1000), 1))
4572 {
4573 assert(poolInstance.workerIndex == 0);
4574 }
4575 4576 foreach (i; poolInstance.parallel(iota(1000), 1))
4577 {
4578 assert(taskPool.workerIndex == 0);
4579 }
4580 4581 // Test exception handling.4582 staticvoidparallelForeachThrow()
4583 {
4584 foreach (elem; parallel(iota(10)))
4585 {
4586 thrownewException("");
4587 }
4588 }
4589 4590 assertThrown!Exception(parallelForeachThrow());
4591 4592 staticintreduceException(inta, intb)
4593 {
4594 thrownewException("");
4595 }
4596 4597 assertThrown!Exception(
4598 poolInstance.reduce!reduceException(iota(3))
4599 );
4600 4601 staticintmapException(inta)
4602 {
4603 thrownewException("");
4604 }
4605 4606 assertThrown!Exception(
4607 poolInstance.amap!mapException(iota(3))
4608 );
4609 4610 staticvoidmapThrow()
4611 {
4612 autom = poolInstance.map!mapException(iota(3));
4613 m.popFront();
4614 }
4615 4616 assertThrown!Exception(mapThrow());
4617 4618 structThrowingRange4619 {
4620 @propertyintfront()
4621 {
4622 return1;
4623 }
4624 voidpopFront()
4625 {
4626 thrownewException("");
4627 }
4628 enumboolempty = false;
4629 }
4630 4631 assertThrown!Exception(poolInstance.asyncBuf(ThrowingRange.init));
4632 }
4633 4634 //version = parallelismStressTest;4635 4636 // These are more like stress tests than real unit tests. They print out4637 // tons of stuff and should not be run every time make unittest is run.4638 version (parallelismStressTest)
4639 {
4640 @systemunittest4641 {
4642 importstd.stdio : stderr, writeln, readln;
4643 importstd.range : iota;
4644 importstd.algorithm.iteration : filter, reduce;
4645 4646 size_tattempt;
4647 for (; attempt < 10; attempt++)
4648 foreach (poolSize; [0, 4])
4649 {
4650 4651 poolInstance = newTaskPool(poolSize);
4652 4653 uint[] numbers = newuint[1_000];
4654 4655 foreach (i; poolInstance.parallel( iota(0, numbers.length)) )
4656 {
4657 numbers[i] = cast(uint) i;
4658 }
4659 4660 // Make sure it works.4661 foreach (i; 0 .. numbers.length)
4662 {
4663 assert(numbers[i] == i);
4664 }
4665 4666 stderr.writeln("Done creating nums.");
4667 4668 4669 automyNumbers = filter!"a % 7 > 0"( iota(0, 1000));
4670 foreach (num; poolInstance.parallel(myNumbers))
4671 {
4672 assert(num % 7 > 0 && num < 1000);
4673 }
4674 stderr.writeln("Done modulus test.");
4675 4676 uint[] squares = poolInstance.amap!"a * a"(numbers, 100);
4677 assert(squares.length == numbers.length);
4678 foreach (i, number; numbers)
4679 {
4680 assert(squares[i] == number * number);
4681 }
4682 stderr.writeln("Done squares.");
4683 4684 autosumFuture = task!( reduce!"a + b" )(numbers);
4685 poolInstance.put(sumFuture);
4686 4687 ulongsumSquares = 0;
4688 foreach (elem; numbers)
4689 {
4690 sumSquares += elem * elem;
4691 }
4692 4693 uintmySum = sumFuture.spinForce();
4694 assert(mySum == 999 * 1000 / 2);
4695 4696 automySumParallel = poolInstance.reduce!"a + b"(numbers);
4697 assert(mySum == mySumParallel);
4698 stderr.writeln("Done sums.");
4699 4700 automyTask = task(
4701 {
4702 synchronizedwriteln("Our lives are parallel...Our lives are parallel.");
4703 });
4704 poolInstance.put(myTask);
4705 4706 autonestedOuter = "abcd";
4707 autonestedInner = iota(0, 10, 2);
4708 4709 foreach (i, letter; poolInstance.parallel(nestedOuter, 1))
4710 {
4711 foreach (j, number; poolInstance.parallel(nestedInner, 1))
4712 {
4713 synchronizedwriteln(i, ": ", letter, " ", j, ": ", number);
4714 }
4715 }
4716 4717 poolInstance.stop();
4718 }
4719 4720 assert(attempt == 10);
4721 writeln("Press enter to go to next round of unittests.");
4722 readln();
4723 }
4724 4725 // These unittests are intended more for actual testing and not so much4726 // as examples.4727 @systemunittest4728 {
4729 importstd.stdio : stderr;
4730 importstd.range : iota;
4731 importstd.algorithm.iteration : filter, reduce;
4732 importstd.math.algebraic : sqrt;
4733 importstd.math.operations : isClose;
4734 importstd.math.traits : isNaN;
4735 importstd.conv : text;
4736 4737 foreach (attempt; 0 .. 10)
4738 foreach (poolSize; [0, 4])
4739 {
4740 poolInstance = newTaskPool(poolSize);
4741 4742 // Test indexing.4743 stderr.writeln("Creator Raw Index: ", poolInstance.threadIndex);
4744 assert(poolInstance.workerIndex() == 0);
4745 4746 // Test worker-local storage.4747 autoworkerLocalStorage = poolInstance.workerLocalStorage!uint(1);
4748 foreach (i; poolInstance.parallel(iota(0U, 1_000_000)))
4749 {
4750 workerLocalStorage.get++;
4751 }
4752 assert(reduce!"a + b"(workerLocalStorage.toRange) ==
4753 1_000_000 + poolInstance.size + 1);
4754 4755 // Make sure work is reasonably balanced among threads. This test is4756 // non-deterministic and is more of a sanity check than something that4757 // has an absolute pass/fail.4758 shared(uint)[void*] nJobsByThread;
4759 foreach (thread; poolInstance.pool)
4760 {
4761 nJobsByThread[cast(void*) thread] = 0;
4762 }
4763 nJobsByThread[ cast(void*) Thread.getThis()] = 0;
4764 4765 foreach (i; poolInstance.parallel( iota(0, 1_000_000), 100 ))
4766 {
4767 atomicOp!"+="( nJobsByThread[ cast(void*) Thread.getThis() ], 1);
4768 }
4769 4770 stderr.writeln("\nCurrent thread is: ",
4771 cast(void*) Thread.getThis());
4772 stderr.writeln("Workload distribution: ");
4773 foreach (k, v; nJobsByThread)
4774 {
4775 stderr.writeln(k, '\t', v);
4776 }
4777 4778 // Test whether amap can be nested.4779 real[][] matrix = newreal[][](1000, 1000);
4780 foreach (i; poolInstance.parallel( iota(0, matrix.length) ))
4781 {
4782 foreach (j; poolInstance.parallel( iota(0, matrix[0].length) ))
4783 {
4784 matrix[i][j] = i * j;
4785 }
4786 }
4787 4788 // Get around weird bugs having to do w/ sqrt being an intrinsic:4789 staticrealmySqrt(realnum)
4790 {
4791 returnsqrt(num);
4792 }
4793 4794 staticreal[] parallelSqrt(real[] nums)
4795 {
4796 returnpoolInstance.amap!mySqrt(nums);
4797 }
4798 4799 real[][] sqrtMatrix = poolInstance.amap!parallelSqrt(matrix);
4800 4801 foreach (i, row; sqrtMatrix)
4802 {
4803 foreach (j, elem; row)
4804 {
4805 realshouldBe = sqrt( cast(real) i * j);
4806 assert(isClose(shouldBe, elem));
4807 sqrtMatrix[i][j] = shouldBe;
4808 }
4809 }
4810 4811 autosaySuccess = task(
4812 {
4813 stderr.writeln(
4814 "Success doing matrix stuff that involves nested pool use.");
4815 });
4816 poolInstance.put(saySuccess);
4817 saySuccess.workForce();
4818 4819 // A more thorough test of amap, reduce: Find the sum of the square roots of4820 // matrix.4821 4822 staticrealparallelSum(real[] input)
4823 {
4824 returnpoolInstance.reduce!"a + b"(input);
4825 }
4826 4827 autosumSqrt = poolInstance.reduce!"a + b"(
4828 poolInstance.amap!parallelSum(
4829 sqrtMatrix4830 )
4831 );
4832 4833 assert(isClose(sumSqrt, 4.437e8, 1e-2));
4834 stderr.writeln("Done sum of square roots.");
4835 4836 // Test whether tasks work with function pointers.4837 /+ // This part is buggy and needs to be fixed...
4838 auto nanTask = task(&isNaN, 1.0L);
4839 poolInstance.put(nanTask);
4840 assert(nanTask.spinForce == false);
4841 4842 if (poolInstance.size > 0)
4843 {
4844 // Test work waiting.
4845 static void uselessFun()
4846 {
4847 foreach (i; 0 .. 1_000_000) {}
4848 }
4849 4850 auto uselessTasks = new typeof(task(&uselessFun))[1000];
4851 foreach (ref uselessTask; uselessTasks)
4852 {
4853 uselessTask = task(&uselessFun);
4854 }
4855 foreach (ref uselessTask; uselessTasks)
4856 {
4857 poolInstance.put(uselessTask);
4858 }
4859 foreach (ref uselessTask; uselessTasks)
4860 {
4861 uselessTask.workForce();
4862 }
4863 }
4864 +/4865 4866 // Test the case of non-random access + ref returns.4867 int[] nums = [1,2,3,4,5];
4868 staticstructRemoveRandom4869 {
4870 int[] arr;
4871 4872 refintfront()
4873 {
4874 returnarr.front;
4875 }
4876 voidpopFront()
4877 {
4878 arr.popFront();
4879 }
4880 boolempty()
4881 {
4882 returnarr.empty;
4883 }
4884 }
4885 4886 autorefRange = RemoveRandom(nums);
4887 foreach (refelem; poolInstance.parallel(refRange))
4888 {
4889 elem++;
4890 }
4891 assert(nums == [2,3,4,5,6], text(nums));
4892 stderr.writeln("Nums: ", nums);
4893 4894 poolInstance.stop();
4895 }
4896 }
4897 }
4898 4899 @systemunittest4900 {
4901 staticstruct__S_127334902 {
4903 invariant() { assert(checksum == 1_234_567_890); }
4904 this(ulongu){n = u;}
4905 voidopAssign(__S_12733s){this.n = s.n;}
4906 ulongn;
4907 ulongchecksum = 1_234_567_890;
4908 }
4909 4910 staticauto__genPair_12733(ulongn) { return__S_12733(n); }
4911 immutableulong[] data = [ 2UL^^59-1, 2UL^^59-1, 2UL^^59-1, 112_272_537_195_293UL ];
4912 4913 autoresult = taskPool.amap!__genPair_12733(data);
4914 }
4915 4916 @safeunittest4917 {
4918 importstd.range : iota;
4919 4920 // this test was in std.range, but caused cycles.4921 assert(__traits(compiles, { foreach (i; iota(0, 100UL).parallel) {} }));
4922 }
4923 4924 @safeunittest4925 {
4926 importstd.algorithm.iteration : each;
4927 4928 long[] arr;
4929 staticassert(is(typeof({
4930 arr.parallel.each!"a++";
4931 })));
4932 }
4933 4934 // https://issues.dlang.org/show_bug.cgi?id=175394935 @systemunittest4936 {
4937 importstd.random : rndGen;
4938 // ensure compilation4939 tryforeach (rnd; rndGen.parallel) break;
4940 catch (ParallelForeachErrore) {}
4941 }