Eigen  3.4.90 (git rev 9589cc4e7fd8e4538bedef80dd36c7738977a8be)
 
All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
Loading...
Searching...
No Matches
NonBlockingThreadPool.h
1// This file is part of Eigen, a lightweight C++ template library
2// for linear algebra.
3//
4// Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
5//
6// This Source Code Form is subject to the terms of the Mozilla
7// Public License v. 2.0. If a copy of the MPL was not distributed
8// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9
10#ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
11#define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
12
13// IWYU pragma: private
14#include "./InternalHeaderCheck.h"
15
16namespace Eigen {
17
18template <typename Environment>
19class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
20 public:
21 typedef typename Environment::EnvThread Thread;
22 typedef typename Environment::Task Task;
23 typedef RunQueue<Task, 1024> Queue;
24
25 struct PerThread {
26 constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) {}
27 ThreadPoolTempl* pool; // Parent pool, or null for normal threads.
28 uint64_t rand; // Random generator state.
29 int thread_id; // Worker thread index in pool.
30 };
31
32 struct ThreadData {
33 constexpr ThreadData() : thread(), steal_partition(0), queue() {}
34 std::unique_ptr<Thread> thread;
35 std::atomic<unsigned> steal_partition;
36 Queue queue;
37 };
38
39 ThreadPoolTempl(int num_threads, Environment env = Environment()) : ThreadPoolTempl(num_threads, true, env) {}
40
41 ThreadPoolTempl(int num_threads, bool allow_spinning, Environment env = Environment())
42 : env_(env),
43 num_threads_(num_threads),
44 allow_spinning_(allow_spinning),
45 spin_count_(
46 // TODO(dvyukov,rmlarsen): The time spent in NonEmptyQueueIndex() is proportional to num_threads_ and
47 // we assume that new work is scheduled at a constant rate, so we divide `kSpintCount` by number of
48 // threads and number of spinning threads. The constant was picked based on a fair dice roll, tune it.
49 allow_spinning && num_threads > 0 ? kSpinCount / kMaxSpinningThreads / num_threads : 0),
50 thread_data_(num_threads),
51 all_coprimes_(num_threads),
52 waiters_(num_threads),
53 global_steal_partition_(EncodePartition(0, num_threads_)),
54 spinning_state_(0),
55 blocked_(0),
56 done_(false),
57 cancelled_(false),
58 ec_(waiters_) {
59 waiters_.resize(num_threads_);
60 // Calculate coprimes of all numbers [1, num_threads].
61 // Coprimes are used for random walks over all threads in Steal
62 // and NonEmptyQueueIndex. Iteration is based on the fact that if we take
63 // a random starting thread index t and calculate num_threads - 1 subsequent
64 // indices as (t + coprime) % num_threads, we will cover all threads without
65 // repetitions (effectively getting a presudo-random permutation of thread
66 // indices).
67 eigen_plain_assert(num_threads_ < kMaxThreads);
68 for (int i = 1; i <= num_threads_; ++i) {
69 all_coprimes_.emplace_back(i);
70 ComputeCoprimes(i, &all_coprimes_.back());
71 }
72#ifndef EIGEN_THREAD_LOCAL
73 init_barrier_.reset(new Barrier(num_threads_));
74#endif
75 thread_data_.resize(num_threads_);
76 for (int i = 0; i < num_threads_; i++) {
77 SetStealPartition(i, EncodePartition(0, num_threads_));
78 thread_data_[i].thread.reset(env_.CreateThread([this, i]() { WorkerLoop(i); }));
79 }
80#ifndef EIGEN_THREAD_LOCAL
81 // Wait for workers to initialize per_thread_map_. Otherwise we might race
82 // with them in Schedule or CurrentThreadId.
83 init_barrier_->Wait();
84#endif
85 }
86
87 ~ThreadPoolTempl() {
88 done_ = true;
89
90 // Now if all threads block without work, they will start exiting.
91 // But note that threads can continue to work arbitrary long,
92 // block, submit new work, unblock and otherwise live full life.
93 if (!cancelled_) {
94 ec_.Notify(true);
95 } else {
96 // Since we were cancelled, there might be entries in the queues.
97 // Empty them to prevent their destructor from asserting.
98 for (size_t i = 0; i < thread_data_.size(); i++) {
99 thread_data_[i].queue.Flush();
100 }
101 }
102 // Join threads explicitly (by destroying) to avoid destruction order within
103 // this class.
104 for (size_t i = 0; i < thread_data_.size(); ++i) thread_data_[i].thread.reset();
105 }
106
107 void SetStealPartitions(const std::vector<std::pair<unsigned, unsigned>>& partitions) {
108 eigen_plain_assert(partitions.size() == static_cast<std::size_t>(num_threads_));
109
110 // Pass this information to each thread queue.
111 for (int i = 0; i < num_threads_; i++) {
112 const auto& pair = partitions[i];
113 unsigned start = pair.first, end = pair.second;
114 AssertBounds(start, end);
115 unsigned val = EncodePartition(start, end);
116 SetStealPartition(i, val);
117 }
118 }
119
120 void Schedule(std::function<void()> fn) EIGEN_OVERRIDE { ScheduleWithHint(std::move(fn), 0, num_threads_); }
121
122 void ScheduleWithHint(std::function<void()> fn, int start, int limit) override {
123 Task t = env_.CreateTask(std::move(fn));
124 PerThread* pt = GetPerThread();
125 if (pt->pool == this) {
126 // Worker thread of this pool, push onto the thread's queue.
127 Queue& q = thread_data_[pt->thread_id].queue;
128 t = q.PushFront(std::move(t));
129 } else {
130 // A free-standing thread (or worker of another pool), push onto a random
131 // queue.
132 eigen_plain_assert(start < limit);
133 eigen_plain_assert(limit <= num_threads_);
134 int num_queues = limit - start;
135 int rnd = Rand(&pt->rand) % num_queues;
136 eigen_plain_assert(start + rnd < limit);
137 Queue& q = thread_data_[start + rnd].queue;
138 t = q.PushBack(std::move(t));
139 }
140 // Note: below we touch this after making w available to worker threads.
141 // Strictly speaking, this can lead to a racy-use-after-free. Consider that
142 // Schedule is called from a thread that is neither main thread nor a worker
143 // thread of this pool. Then, execution of w directly or indirectly
144 // completes overall computations, which in turn leads to destruction of
145 // this. We expect that such scenario is prevented by program, that is,
146 // this is kept alive while any threads can potentially be in Schedule.
147 if (!t.f) {
148 if (IsNotifyParkedThreadRequired()) {
149 ec_.Notify(false);
150 }
151 } else {
152 env_.ExecuteTask(t); // Push failed, execute directly.
153 }
154 }
155
156 // Tries to assign work to the current task.
157 void MaybeGetTask(Task* t) {
158 PerThread* pt = GetPerThread();
159 Queue& q = thread_data_[pt->thread_id].queue;
160 *t = q.PopFront();
161 if (t->f) return;
162 if (num_threads_ == 1) {
163 // For num_threads_ == 1 there is no point in going through the expensive
164 // steal loop. Moreover, since NonEmptyQueueIndex() calls PopBack() on the
165 // victim queues it might reverse the order in which ops are executed
166 // compared to the order in which they are scheduled, which tends to be
167 // counter-productive for the types of I/O workloads single thread pools
168 // tend to be used for.
169 for (int i = 0; i < spin_count_ && !t->f; ++i) *t = q.PopFront();
170 } else {
171 if (EIGEN_PREDICT_FALSE(!t->f)) *t = LocalSteal();
172 if (EIGEN_PREDICT_FALSE(!t->f)) *t = GlobalSteal();
173 if (EIGEN_PREDICT_FALSE(!t->f)) {
174 if (allow_spinning_ && StartSpinning()) {
175 for (int i = 0; i < spin_count_ && !t->f; ++i) *t = GlobalSteal();
176 // Notify `spinning_state_` that we are no longer spinning.
177 bool has_no_notify_task = StopSpinning();
178 // If a task was submitted to the queue without a call to
179 // `ec_.Notify()` (if `IsNotifyParkedThreadRequired()` returned
180 // false), and we didn't steal anything above, we must try to
181 // steal one more time, to make sure that this task will be
182 // executed. We will not necessarily find it, because it might
183 // have been already stolen by some other thread.
184 if (has_no_notify_task && !t->f) *t = q.PopFront();
185 }
186 }
187 }
188 }
189
190 void Cancel() EIGEN_OVERRIDE {
191 cancelled_ = true;
192 done_ = true;
193
194 // Let each thread know it's been cancelled.
195#ifdef EIGEN_THREAD_ENV_SUPPORTS_CANCELLATION
196 for (size_t i = 0; i < thread_data_.size(); i++) {
197 thread_data_[i].thread->OnCancel();
198 }
199#endif
200
201 // Wake up the threads without work to let them exit on their own.
202 ec_.Notify(true);
203 }
204
205 int NumThreads() const EIGEN_FINAL { return num_threads_; }
206
207 int CurrentThreadId() const EIGEN_FINAL {
208 const PerThread* pt = const_cast<ThreadPoolTempl*>(this)->GetPerThread();
209 if (pt->pool == this) {
210 return pt->thread_id;
211 } else {
212 return -1;
213 }
214 }
215
216 private:
217 // Create a single atomic<int> that encodes start and limit information for
218 // each thread.
219 // We expect num_threads_ < 65536, so we can store them in a single
220 // std::atomic<unsigned>.
221 // Exposed publicly as static functions so that external callers can reuse
222 // this encode/decode logic for maintaining their own thread-safe copies of
223 // scheduling and steal domain(s).
224 static constexpr int kMaxPartitionBits = 16;
225 static constexpr int kMaxThreads = 1 << kMaxPartitionBits;
226
227 inline unsigned EncodePartition(unsigned start, unsigned limit) { return (start << kMaxPartitionBits) | limit; }
228
229 inline void DecodePartition(unsigned val, unsigned* start, unsigned* limit) {
230 *limit = val & (kMaxThreads - 1);
231 val >>= kMaxPartitionBits;
232 *start = val;
233 }
234
235 void AssertBounds(int start, int end) {
236 eigen_plain_assert(start >= 0);
237 eigen_plain_assert(start < end); // non-zero sized partition
238 eigen_plain_assert(end <= num_threads_);
239 }
240
241 inline void SetStealPartition(size_t i, unsigned val) {
242 thread_data_[i].steal_partition.store(val, std::memory_order_relaxed);
243 }
244
245 inline unsigned GetStealPartition(int i) { return thread_data_[i].steal_partition.load(std::memory_order_relaxed); }
246
247 void ComputeCoprimes(int N, MaxSizeVector<unsigned>* coprimes) {
248 for (int i = 1; i <= N; i++) {
249 unsigned a = i;
250 unsigned b = N;
251 // If GCD(a, b) == 1, then a and b are coprimes.
252 while (b != 0) {
253 unsigned tmp = a;
254 a = b;
255 b = tmp % b;
256 }
257 if (a == 1) {
258 coprimes->push_back(i);
259 }
260 }
261 }
262
263 // Maximum number of threads that can spin in steal loop.
264 static constexpr int kMaxSpinningThreads = 1;
265
266 // The number of steal loop spin iterations before parking (this number is
267 // divided by the number of threads, to get spin count for each thread).
268 static constexpr int kSpinCount = 5000;
269
270 // If there are enough active threads with empty pending-task queues, a thread
271 // that runs out of work can just be parked without spinning, because these
272 // active threads will go into a steal loop after finishing their current
273 // tasks.
274 //
275 // In the worst case when all active threads are executing long/expensive
276 // tasks, the next Schedule() will have to wait until one of the parked
277 // threads will be unparked, however this should be very rare in practice.
278 static constexpr int kMinActiveThreadsToStartSpinning = 4;
279
280 struct SpinningState {
281 // Spinning state layout:
282 //
283 // - Low 32 bits encode the number of threads that are spinning in steal
284 // loop.
285 //
286 // - High 32 bits encode the number of tasks that were submitted to the pool
287 // without a call to `ec_.Notify()`. This number can't be larger than
288 // the number of spinning threads. Each spinning thread, when it exits the
289 // spin loop must check if this number is greater than zero, and maybe
290 // make another attempt to steal a task and decrement it by one.
291 static constexpr uint64_t kNumSpinningMask = 0x00000000FFFFFFFF;
292 static constexpr uint64_t kNumNoNotifyMask = 0xFFFFFFFF00000000;
293 static constexpr uint64_t kNumNoNotifyShift = 32;
294
295 uint64_t num_spinning; // number of spinning threads
296 uint64_t num_no_notification; // number of tasks submitted without
297 // notifying waiting threads
298
299 // Decodes `spinning_state_` value.
300 static SpinningState Decode(uint64_t state) {
301 uint64_t num_spinning = (state & kNumSpinningMask);
302 uint64_t num_no_notification = (state & kNumNoNotifyMask) >> kNumNoNotifyShift;
303
304 eigen_plain_assert(num_no_notification <= num_spinning);
305 return {num_spinning, num_no_notification};
306 }
307
308 // Encodes as `spinning_state_` value.
309 uint64_t Encode() const {
310 eigen_plain_assert(num_no_notification <= num_spinning);
311 return (num_no_notification << kNumNoNotifyShift) | num_spinning;
312 }
313 };
314
315 Environment env_;
316 const int num_threads_;
317 const bool allow_spinning_;
318 const int spin_count_;
319 MaxSizeVector<ThreadData> thread_data_;
320 MaxSizeVector<MaxSizeVector<unsigned>> all_coprimes_;
321 MaxSizeVector<EventCount::Waiter> waiters_;
322 unsigned global_steal_partition_;
323 std::atomic<uint64_t> spinning_state_;
324 std::atomic<unsigned> blocked_;
325 std::atomic<bool> done_;
326 std::atomic<bool> cancelled_;
327 EventCount ec_;
328#ifndef EIGEN_THREAD_LOCAL
329 std::unique_ptr<Barrier> init_barrier_;
330 EIGEN_MUTEX per_thread_map_mutex_; // Protects per_thread_map_.
331 std::unordered_map<uint64_t, std::unique_ptr<PerThread>> per_thread_map_;
332#endif
333
334 unsigned NumBlockedThreads() const { return blocked_.load(); }
335 unsigned NumActiveThreads() const { return num_threads_ - blocked_.load(); }
336
337 // Main worker thread loop.
338 void WorkerLoop(int thread_id) {
339#ifndef EIGEN_THREAD_LOCAL
340 std::unique_ptr<PerThread> new_pt(new PerThread());
341 per_thread_map_mutex_.lock();
342 bool insertOK = per_thread_map_.emplace(GlobalThreadIdHash(), std::move(new_pt)).second;
343 eigen_plain_assert(insertOK);
344 EIGEN_UNUSED_VARIABLE(insertOK);
345 per_thread_map_mutex_.unlock();
346 init_barrier_->Notify();
347 init_barrier_->Wait();
348#endif
349 PerThread* pt = GetPerThread();
350 pt->pool = this;
351 pt->rand = GlobalThreadIdHash();
352 pt->thread_id = thread_id;
353 Task t;
354 while (!cancelled_.load(std::memory_order_relaxed)) {
355 MaybeGetTask(&t);
356 // If we still don't have a task, wait for one. Return if thread pool is
357 // in cancelled state.
358 if (EIGEN_PREDICT_FALSE(!t.f)) {
359 EventCount::Waiter* waiter = &waiters_[pt->thread_id];
360 if (!WaitForWork(waiter, &t)) return;
361 }
362 if (EIGEN_PREDICT_TRUE(t.f)) env_.ExecuteTask(t);
363 }
364 }
365
366 // Steal tries to steal work from other worker threads in the range [start,
367 // limit) in best-effort manner.
368 Task Steal(unsigned start, unsigned limit) {
369 PerThread* pt = GetPerThread();
370 const size_t size = limit - start;
371 unsigned r = Rand(&pt->rand);
372 // Reduce r into [0, size) range, this utilizes trick from
373 // https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
374 eigen_plain_assert(all_coprimes_[size - 1].size() < (1 << 30));
375 unsigned victim = ((uint64_t)r * (uint64_t)size) >> 32;
376 unsigned index = ((uint64_t)all_coprimes_[size - 1].size() * (uint64_t)r) >> 32;
377 unsigned inc = all_coprimes_[size - 1][index];
378
379 for (unsigned i = 0; i < size; i++) {
380 eigen_plain_assert(start + victim < limit);
381 Task t = thread_data_[start + victim].queue.PopBack();
382 if (t.f) {
383 return t;
384 }
385 victim += inc;
386 if (victim >= size) {
387 victim -= static_cast<unsigned int>(size);
388 }
389 }
390 return Task();
391 }
392
393 // Steals work within threads belonging to the partition.
394 Task LocalSteal() {
395 PerThread* pt = GetPerThread();
396 unsigned partition = GetStealPartition(pt->thread_id);
397 // If thread steal partition is the same as global partition, there is no
398 // need to go through the steal loop twice.
399 if (global_steal_partition_ == partition) return Task();
400 unsigned start, limit;
401 DecodePartition(partition, &start, &limit);
402 AssertBounds(start, limit);
403
404 return Steal(start, limit);
405 }
406
407 // Steals work from any other thread in the pool.
408 Task GlobalSteal() { return Steal(0, num_threads_); }
409
410 // WaitForWork blocks until new work is available (returns true), or if it is
411 // time to exit (returns false). Can optionally return a task to execute in t
412 // (in such case t.f != nullptr on return).
413 bool WaitForWork(EventCount::Waiter* waiter, Task* t) {
414 eigen_plain_assert(!t->f);
415 // We already did best-effort emptiness check in Steal, so prepare for
416 // blocking.
417 ec_.Prewait();
418 // Now do a reliable emptiness check.
419 int victim = NonEmptyQueueIndex();
420 if (victim != -1) {
421 ec_.CancelWait();
422 if (cancelled_) {
423 return false;
424 } else {
425 *t = thread_data_[victim].queue.PopBack();
426 return true;
427 }
428 }
429 // Number of blocked threads is used as termination condition.
430 // If we are shutting down and all worker threads blocked without work,
431 // that's we are done.
432 blocked_++;
433 // TODO is blocked_ required to be unsigned?
434 if (done_ && blocked_ == static_cast<unsigned>(num_threads_)) {
435 ec_.CancelWait();
436 // Almost done, but need to re-check queues.
437 // Consider that all queues are empty and all worker threads are preempted
438 // right after incrementing blocked_ above. Now a free-standing thread
439 // submits work and calls destructor (which sets done_). If we don't
440 // re-check queues, we will exit leaving the work unexecuted.
441 if (NonEmptyQueueIndex() != -1) {
442 // Note: we must not pop from queues before we decrement blocked_,
443 // otherwise the following scenario is possible. Consider that instead
444 // of checking for emptiness we popped the only element from queues.
445 // Now other worker threads can start exiting, which is bad if the
446 // work item submits other work. So we just check emptiness here,
447 // which ensures that all worker threads exit at the same time.
448 blocked_--;
449 return true;
450 }
451 // Reached stable termination state.
452 ec_.Notify(true);
453 return false;
454 }
455 ec_.CommitWait(waiter);
456 blocked_--;
457 return true;
458 }
459
460 int NonEmptyQueueIndex() {
461 PerThread* pt = GetPerThread();
462 // We intentionally design NonEmptyQueueIndex to steal work from
463 // anywhere in the queue so threads don't block in WaitForWork() forever
464 // when all threads in their partition go to sleep. Steal is still local.
465 const size_t size = thread_data_.size();
466 unsigned r = Rand(&pt->rand);
467 unsigned inc = all_coprimes_[size - 1][r % all_coprimes_[size - 1].size()];
468 unsigned victim = r % size;
469 for (unsigned i = 0; i < size; i++) {
470 if (!thread_data_[victim].queue.Empty()) {
471 return victim;
472 }
473 victim += inc;
474 if (victim >= size) {
475 victim -= static_cast<unsigned int>(size);
476 }
477 }
478 return -1;
479 }
480
481 // StartSpinning() checks if the number of threads in the spin loop is less
482 // than the allowed maximum. If so, increments the number of spinning threads
483 // by one and returns true (caller must enter the spin loop). Otherwise
484 // returns false, and the caller must not enter the spin loop.
485 bool StartSpinning() {
486 if (NumActiveThreads() > kMinActiveThreadsToStartSpinning) return false;
487
488 uint64_t spinning = spinning_state_.load(std::memory_order_relaxed);
489 for (;;) {
490 SpinningState state = SpinningState::Decode(spinning);
491
492 if ((state.num_spinning - state.num_no_notification) >= kMaxSpinningThreads) {
493 return false;
494 }
495
496 // Increment the number of spinning threads.
497 ++state.num_spinning;
498
499 if (spinning_state_.compare_exchange_weak(spinning, state.Encode(), std::memory_order_relaxed)) {
500 return true;
501 }
502 }
503 }
504
505 // StopSpinning() decrements the number of spinning threads by one. It also
506 // checks if there were any tasks submitted into the pool without notifying
507 // parked threads, and decrements the count by one. Returns true if the number
508 // of tasks submitted without notification was decremented. In this case,
509 // caller thread might have to call Steal() one more time.
510 bool StopSpinning() {
511 uint64_t spinning = spinning_state_.load(std::memory_order_relaxed);
512 for (;;) {
513 SpinningState state = SpinningState::Decode(spinning);
514
515 // Decrement the number of spinning threads.
516 --state.num_spinning;
517
518 // Maybe decrement the number of tasks submitted without notification.
519 bool has_no_notify_task = state.num_no_notification > 0;
520 if (has_no_notify_task) --state.num_no_notification;
521
522 if (spinning_state_.compare_exchange_weak(spinning, state.Encode(), std::memory_order_relaxed)) {
523 return has_no_notify_task;
524 }
525 }
526 }
527
528 // IsNotifyParkedThreadRequired() returns true if parked thread must be
529 // notified about new added task. If there are threads spinning in the steal
530 // loop, there is no need to unpark any of the waiting threads, the task will
531 // be picked up by one of the spinning threads.
532 bool IsNotifyParkedThreadRequired() {
533 uint64_t spinning = spinning_state_.load(std::memory_order_relaxed);
534 for (;;) {
535 SpinningState state = SpinningState::Decode(spinning);
536
537 // If the number of tasks submitted without notifying parked threads is
538 // equal to the number of spinning threads, we must wake up one of the
539 // parked threads.
540 if (state.num_no_notification == state.num_spinning) return true;
541
542 // Increment the number of tasks submitted without notification.
543 ++state.num_no_notification;
544
545 if (spinning_state_.compare_exchange_weak(spinning, state.Encode(), std::memory_order_relaxed)) {
546 return false;
547 }
548 }
549 }
550
551 static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash() {
552 return std::hash<std::thread::id>()(std::this_thread::get_id());
553 }
554
555 EIGEN_STRONG_INLINE PerThread* GetPerThread() {
556#ifndef EIGEN_THREAD_LOCAL
557 static PerThread dummy;
558 auto it = per_thread_map_.find(GlobalThreadIdHash());
559 if (it == per_thread_map_.end()) {
560 return &dummy;
561 } else {
562 return it->second.get();
563 }
564#else
565 EIGEN_THREAD_LOCAL PerThread per_thread_;
566 PerThread* pt = &per_thread_;
567 return pt;
568#endif
569 }
570
571 static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) {
572 uint64_t current = *state;
573 // Update the internal state
574 *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
575 // Generate the random output (using the PCG-XSH-RS scheme)
576 return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
577 }
578};
579
580typedef ThreadPoolTempl<StlThreadEnvironment> ThreadPool;
581
582} // namespace Eigen
583
584#endif // EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
Namespace containing all symbols from the Eigen library.
Definition B01_Experimental.dox:1