Eigen  5.0.1-dev+60122df6
 
Loading...
Searching...
No Matches
RunQueue.h
1// This file is part of Eigen, a lightweight C++ template library
2// for linear algebra.
3//
4// Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
5//
6// This Source Code Form is subject to the terms of the Mozilla
7// Public License v. 2.0. If a copy of the MPL was not distributed
8// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9
10#ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H
11#define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H
12
13// IWYU pragma: private
14#include "./InternalHeaderCheck.h"
15
16namespace Eigen {
17
18// RunQueue is a fixed-size, partially non-blocking deque or Work items.
19// Operations on front of the queue must be done by a single thread (owner),
20// operations on back of the queue can be done by multiple threads concurrently.
21//
22// Algorithm outline:
23// All remote threads operating on the queue back are serialized by a mutex.
24// This ensures that at most two threads access state: owner and one remote
25// thread (Size aside). The algorithm ensures that the occupied region of the
26// underlying array is logically continuous (can wraparound, but no stray
27// occupied elements). Owner operates on one end of this region, remote thread
28// operates on the other end. Synchronization between these threads
29// (potential consumption of the last element and take up of the last empty
30// element) happens by means of state variable in each element. States are:
31// empty, busy (in process of insertion of removal) and ready. Threads claim
32// elements (empty->busy and ready->busy transitions) by means of a CAS
33// operation. The finishing transition (busy->empty and busy->ready) are done
34// with plain store as the element is exclusively owned by the current thread.
35//
36// Note: we could permit only pointers as elements, then we would not need
37// separate state variable as null/non-null pointer value would serve as state,
38// but that would require malloc/free per operation for large, complex values
39// (and this is designed to store std::function<()>).
40template <typename Work, unsigned kSize>
41class RunQueue {
42 public:
43 RunQueue() : front_(0), back_(0) {
44 // require power-of-two for fast masking
45 eigen_plain_assert((kSize & (kSize - 1)) == 0);
46 eigen_plain_assert(kSize > 2); // why would you do this?
47 eigen_plain_assert(kSize <= (64 << 10)); // leave enough space for counter
48 for (unsigned i = 0; i < kSize; i++) array_[i].state.store(kEmpty, std::memory_order_relaxed);
49 }
50
51 ~RunQueue() { eigen_plain_assert(Size() == 0); }
52
53 // PushFront inserts w at the beginning of the queue.
54 // If queue is full returns w, otherwise returns default-constructed Work.
55 Work PushFront(Work w) {
56 unsigned front = front_.load(std::memory_order_relaxed);
57 Elem* e = &array_[front & kMask];
58 uint8_t s = e->state.load(std::memory_order_relaxed);
59 if (s != kEmpty || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) return w;
60 front_.store(front + 1 + (kSize << 1), std::memory_order_relaxed);
61 e->w = std::move(w);
62 e->state.store(kReady, std::memory_order_release);
63 return Work();
64 }
65
66 // PopFront removes and returns the first element in the queue.
67 // If the queue was empty returns default-constructed Work.
68 Work PopFront() {
69 unsigned front = front_.load(std::memory_order_relaxed);
70 Elem* e = &array_[(front - 1) & kMask];
71 uint8_t s = e->state.load(std::memory_order_relaxed);
72 if (s != kReady || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) return Work();
73 Work w = std::move(e->w);
74 e->state.store(kEmpty, std::memory_order_release);
75 front = ((front - 1) & kMask2) | (front & ~kMask2);
76 front_.store(front, std::memory_order_relaxed);
77 return w;
78 }
79
80 // PushBack adds w at the end of the queue.
81 // If queue is full returns w, otherwise returns default-constructed Work.
82 Work PushBack(Work w) {
83 EIGEN_MUTEX_LOCK lock(mutex_);
84 unsigned back = back_.load(std::memory_order_relaxed);
85 Elem* e = &array_[(back - 1) & kMask];
86 uint8_t s = e->state.load(std::memory_order_relaxed);
87 if (s != kEmpty || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) return w;
88 back = ((back - 1) & kMask2) | (back & ~kMask2);
89 back_.store(back, std::memory_order_relaxed);
90 e->w = std::move(w);
91 e->state.store(kReady, std::memory_order_release);
92 return Work();
93 }
94
95 // PopBack removes and returns the last elements in the queue.
96 Work PopBack() {
97 if (Empty()) return Work();
98 EIGEN_MUTEX_LOCK lock(mutex_);
99 unsigned back = back_.load(std::memory_order_relaxed);
100 Elem* e = &array_[back & kMask];
101 uint8_t s = e->state.load(std::memory_order_relaxed);
102 if (s != kReady || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) return Work();
103 Work w = std::move(e->w);
104 e->state.store(kEmpty, std::memory_order_release);
105 back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
106 return w;
107 }
108
109 // PopBackHalf removes and returns half last elements in the queue.
110 // Returns number of elements removed.
111 unsigned PopBackHalf(std::vector<Work>* result) {
112 if (Empty()) return 0;
113 EIGEN_MUTEX_LOCK lock(mutex_);
114 unsigned back = back_.load(std::memory_order_relaxed);
115 unsigned size = Size();
116 unsigned mid = back;
117 if (size > 1) mid = back + (size - 1) / 2;
118 unsigned n = 0;
119 unsigned start = 0;
120 for (; static_cast<int>(mid - back) >= 0; mid--) {
121 Elem* e = &array_[mid & kMask];
122 uint8_t s = e->state.load(std::memory_order_relaxed);
123 if (n == 0) {
124 if (s != kReady || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) continue;
125 start = mid;
126 } else {
127 // Note: no need to store temporal kBusy, we exclusively own these
128 // elements.
129 eigen_plain_assert(s == kReady);
130 }
131 result->push_back(std::move(e->w));
132 e->state.store(kEmpty, std::memory_order_release);
133 n++;
134 }
135 if (n != 0) back_.store(start + 1 + (kSize << 1), std::memory_order_relaxed);
136 return n;
137 }
138
139 // Size returns current queue size.
140 // Can be called by any thread at any time.
141 unsigned Size() const { return SizeOrNotEmpty<true>(); }
142
143 // Empty tests whether container is empty.
144 // Can be called by any thread at any time.
145 bool Empty() const { return SizeOrNotEmpty<false>() == 0; }
146
147 // Delete all the elements from the queue.
148 void Flush() {
149 while (!Empty()) {
150 PopFront();
151 }
152 }
153
154 private:
155 static const unsigned kMask = kSize - 1;
156 static const unsigned kMask2 = (kSize << 1) - 1;
157
158 enum State {
159 kEmpty,
160 kBusy,
161 kReady,
162 };
163
164 struct Elem {
165 std::atomic<uint8_t> state;
166 Work w;
167 };
168
169 // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of
170 // front/back, respectively. The remaining bits contain modification counters
171 // that are incremented on Push operations. This allows us to (1) distinguish
172 // between empty and full conditions (if we would use log(kSize) bits for
173 // position, these conditions would be indistinguishable); (2) obtain
174 // consistent snapshot of front_/back_ for Size operation using the
175 // modification counters.
176 EIGEN_ALIGN_TO_AVOID_FALSE_SHARING std::atomic<unsigned> front_;
177 EIGEN_ALIGN_TO_AVOID_FALSE_SHARING std::atomic<unsigned> back_;
178 EIGEN_MUTEX mutex_; // guards `PushBack` and `PopBack` (accesses `back_`)
179
180 EIGEN_ALIGN_TO_AVOID_FALSE_SHARING Elem array_[kSize];
181
182 // SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false,
183 // only whether the size is 0 is guaranteed to be correct.
184 // Can be called by any thread at any time.
185 template <bool NeedSizeEstimate>
186 unsigned SizeOrNotEmpty() const {
187 // Emptiness plays critical role in thread pool blocking. So we go to great
188 // effort to not produce false positives (claim non-empty queue as empty).
189 unsigned front = front_.load(std::memory_order_acquire);
190 for (;;) {
191 // Capture a consistent snapshot of front/tail.
192 unsigned back = back_.load(std::memory_order_acquire);
193 unsigned front1 = front_.load(std::memory_order_relaxed);
194 if (front != front1) {
195 front = front1;
196 std::atomic_thread_fence(std::memory_order_acquire);
197 continue;
198 }
199 if (NeedSizeEstimate) {
200 return CalculateSize(front, back);
201 } else {
202 // This value will be 0 if the queue is empty, and undefined otherwise.
203 unsigned maybe_zero = ((front ^ back) & kMask2);
204 // Queue size estimate must agree with maybe zero check on the queue
205 // empty/non-empty state.
206 eigen_assert((CalculateSize(front, back) == 0) == (maybe_zero == 0));
207 return maybe_zero;
208 }
209 }
210 }
211
212 EIGEN_ALWAYS_INLINE unsigned CalculateSize(unsigned front, unsigned back) const {
213 int size = (front & kMask2) - (back & kMask2);
214 // Fix overflow.
215 if (EIGEN_PREDICT_FALSE(size < 0)) size += 2 * kSize;
216 // Order of modification in push/pop is crafted to make the queue look
217 // larger than it is during concurrent modifications. E.g. push can
218 // increment size before the corresponding pop has decremented it.
219 // So the computed size can be up to kSize + 1, fix it.
220 if (EIGEN_PREDICT_FALSE(size > static_cast<int>(kSize))) size = kSize;
221 return static_cast<unsigned>(size);
222 }
223
224 RunQueue(const RunQueue&) = delete;
225 void operator=(const RunQueue&) = delete;
226};
227
228} // namespace Eigen
229
230#endif // EIGEN_CXX11_THREADPOOL_RUNQUEUE_H
Namespace containing all symbols from the Eigen library.
Definition B01_Experimental.dox:1