Eigen-unsupported  5.0.1-dev+284dcc12
 
Loading...
Searching...
No Matches
TensorDeviceThreadPool.h
1// This file is part of Eigen, a lightweight C++ template library
2// for linear algebra.
3//
4// Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog@gmail.com>
5//
6// This Source Code Form is subject to the terms of the Mozilla
7// Public License v. 2.0. If a copy of the MPL was not distributed
8// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9
10#if defined(EIGEN_USE_THREADS) && !defined(EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H)
11#define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
12
13// IWYU pragma: private
14#include "./InternalHeaderCheck.h"
15
16namespace Eigen {
17
18// An abstract interface to a device specific memory allocator.
19class Allocator {
20 public:
21 virtual ~Allocator() {}
22 virtual void* allocate(size_t num_bytes) const = 0;
23 virtual void deallocate(void* buffer) const = 0;
24};
25
26// Build a thread pool device on top the an existing pool of threads.
27struct ThreadPoolDevice {
28 // The ownership of the thread pool remains with the caller.
29 ThreadPoolDevice(ThreadPoolInterface* pool, int num_cores, Allocator* allocator = nullptr)
30 : pool_(pool), num_threads_(num_cores), allocator_(allocator) {}
31
32 EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const {
33 return allocator_ ? allocator_->allocate(num_bytes) : internal::aligned_malloc(num_bytes);
34 }
35
36 EIGEN_STRONG_INLINE void deallocate(void* buffer) const {
37 if (allocator_) {
38 allocator_->deallocate(buffer);
39 } else {
40 internal::aligned_free(buffer);
41 }
42 }
43
44 EIGEN_STRONG_INLINE void* allocate_temp(size_t num_bytes) const { return allocate(num_bytes); }
45
46 EIGEN_STRONG_INLINE void deallocate_temp(void* buffer) const { deallocate(buffer); }
47
48 template <typename Type>
49 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE Type get(Type data) const {
50 return data;
51 }
52
53 EIGEN_STRONG_INLINE void memcpy(void* dst, const void* src, size_t n) const {
54#ifdef __ANDROID__
55 ::memcpy(dst, src, n);
56#else
57 // TODO(rmlarsen): Align blocks on cache lines.
58 // We have observed that going beyond 4 threads usually just wastes
59 // CPU cycles due to the threads competing for memory bandwidth, so we
60 // statically schedule at most 4 block copies here.
61 const size_t kMinBlockSize = 32768;
62 const size_t num_threads = CostModel::numThreads(n, TensorOpCost(1.0, 1.0, 0), 4);
63 if (n <= kMinBlockSize || num_threads < 2) {
64 ::memcpy(dst, src, n);
65 } else {
66 const char* src_ptr = static_cast<const char*>(src);
67 char* dst_ptr = static_cast<char*>(dst);
68 const size_t blocksize = (n + (num_threads - 1)) / num_threads;
69 Barrier barrier(static_cast<int>(num_threads - 1));
70 // Launch the last 3 blocks on worker threads.
71 for (size_t i = 1; i < num_threads; ++i) {
72 pool_->Schedule([n, i, src_ptr, dst_ptr, blocksize, &barrier] {
73 ::memcpy(dst_ptr + i * blocksize, src_ptr + i * blocksize, numext::mini(blocksize, n - (i * blocksize)));
74 barrier.Notify();
75 });
76 }
77 // Launch the first block on the main thread.
78 ::memcpy(dst_ptr, src_ptr, blocksize);
79 barrier.Wait();
80 }
81#endif
82 }
83 EIGEN_STRONG_INLINE void memcpyHostToDevice(void* dst, const void* src, size_t n) const { memcpy(dst, src, n); }
84 EIGEN_STRONG_INLINE void memcpyDeviceToHost(void* dst, const void* src, size_t n) const { memcpy(dst, src, n); }
85
86 EIGEN_STRONG_INLINE void memset(void* buffer, int c, size_t n) const { ::memset(buffer, c, n); }
87
88 template <typename T>
89 EIGEN_STRONG_INLINE void fill(T* begin, T* end, const T& value) const {
90 std::fill(begin, end, value);
91 }
92
93 EIGEN_STRONG_INLINE int numThreads() const { return num_threads_; }
94
95 // Number of theads available in the underlying thread pool. This number can
96 // be different from the value returned by numThreads().
97 EIGEN_STRONG_INLINE int numThreadsInPool() const { return pool_->NumThreads(); }
98
99 EIGEN_STRONG_INLINE size_t firstLevelCacheSize() const { return l1CacheSize(); }
100
101 EIGEN_STRONG_INLINE size_t lastLevelCacheSize() const {
102 // The l3 cache size is shared between all the cores.
103 return l3CacheSize() / num_threads_;
104 }
105
106 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void synchronize() const {
107 // Nothing. Threadpool device operations are synchronous.
108 }
109
110 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int majorDeviceVersion() const {
111 // Should return an enum that encodes the ISA supported by the CPU
112 return 1;
113 }
114
115 // TODO(rmlarsen): Remove this deprecated interface when all users have been converted.
116 template <class Function, class... Args>
117 EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, Args&&... args) const {
118 enqueue(std::forward<Function>(f), std::forward<Args>(args)...);
119 }
120
121 template <class Function, class... Args>
122 EIGEN_STRONG_INLINE void enqueue(Function&& f, Args&&... args) const {
123#if EIGEN_COMP_CXXVER >= 20
124 if constexpr (sizeof...(args) > 0) {
125 auto run_f = [f = std::forward<Function>(f), ... args = std::forward<Args>(args)]() { f(args...); };
126#else
127 if (sizeof...(args) > 0) {
128 auto run_f = [f = std::forward<Function>(f), &args...]() { f(args...); };
129#endif
130 pool_->Schedule(std::move(run_f));
131 } else {
132 pool_->Schedule(std::forward<Function>(f));
133 }
134 }
135
136 // Returns a logical thread index between 0 and pool_->NumThreads() - 1 if
137 // called from one of the threads in pool_. Returns -1 otherwise.
138 EIGEN_STRONG_INLINE int currentThreadId() const { return pool_->CurrentThreadId(); }
139
140 // WARNING: This function is synchronous and will block the calling thread.
141 //
142 // Synchronous parallelFor executes f with [0, n) arguments in parallel and
143 // waits for completion. F accepts a half-open interval [first, last). Block
144 // size is chosen based on the iteration cost and resulting parallel
145 // efficiency. If block_align is not nullptr, it is called to round up the
146 // block size.
147 void parallelFor(Index n, const TensorOpCost& cost, std::function<Index(Index)> block_align,
148 std::function<void(Index, Index)> f) const {
149 if (EIGEN_PREDICT_FALSE(n <= 0)) {
150 return;
151 // Compute small problems directly in the caller thread.
152 } else if (n == 1 || numThreads() == 1 || CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
153 f(0, n);
154 return;
155 }
156
157 // Compute block size and total count of blocks.
158 ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
159
160 // Recursively divide size into halves until we reach block_size.
161 // Division code rounds mid to block_size, so we are guaranteed to get
162 // block_count leaves that do actual computations.
163 Barrier barrier(static_cast<unsigned int>(block.count));
164 if (block.count <= numThreads()) {
165 // Avoid a thread hop by running the root of the tree and one block on the
166 // main thread.
167 handleRange(0, n, block.size, &barrier, pool_, f);
168 } else {
169 // Execute the root in the thread pool to avoid running work on more than
170 // numThreads() threads.
171 pool_->Schedule([this, n, &block, &barrier, &f]() { handleRange(0, n, block.size, &barrier, pool_, f); });
172 }
173
174 barrier.Wait();
175 }
176
177 // Convenience wrapper for parallelFor that does not align blocks.
178 void parallelFor(Index n, const TensorOpCost& cost, std::function<void(Index, Index)> f) const {
179 parallelFor(n, cost, nullptr, std::move(f));
180 }
181
182 // WARNING: This function is asynchronous and will not block the calling thread.
183 //
184 // Asynchronous parallelFor executes f with [0, n) arguments in parallel
185 // without waiting for completion. When the last block finished, it will call
186 // 'done' callback. F accepts a half-open interval [first, last). Block size
187 // is chosen based on the iteration cost and resulting parallel efficiency. If
188 // block_align is not nullptr, it is called to round up the block size.
189 void parallelForAsync(Index n, const TensorOpCost& cost, std::function<Index(Index)> block_align,
190 std::function<void(Index, Index)> f, std::function<void()> done) const {
191 // Compute small problems directly in the caller thread.
192 if (n <= 1 || numThreads() == 1 || CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
193 f(0, n);
194 done();
195 return;
196 }
197
198 // Compute block size and total count of blocks.
199 ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
200
201 ParallelForAsyncContext* const ctx = new ParallelForAsyncContext(block.count, std::move(f), std::move(done));
202
203 // Recursively divide size into halves until we reach block_size.
204 // Division code rounds mid to block_size, so we are guaranteed to get
205 // block_count leaves that do actual computations.
206 ctx->handle_range = [this, ctx, block](Index firstIdx, Index lastIdx) {
207 while (lastIdx - firstIdx > block.size) {
208 // Split into halves and schedule the second half on a different thread.
209 const Index midIdx = firstIdx + numext::div_ceil((lastIdx - firstIdx) / 2, block.size) * block.size;
210 pool_->Schedule([ctx, midIdx, lastIdx]() { ctx->handle_range(midIdx, lastIdx); });
211 lastIdx = midIdx;
212 }
213
214 // Single block or less, execute directly.
215 ctx->f(firstIdx, lastIdx);
216
217 // Delete async context if it was the last block.
218 if (ctx->count.fetch_sub(1) == 1) delete ctx;
219 };
220
221 if (block.count <= numThreads()) {
222 // Avoid a thread hop by running the root of the tree and one block on the
223 // main thread.
224 ctx->handle_range(0, n);
225 } else {
226 // Execute the root in the thread pool to avoid running work on more than
227 // numThreads() threads.
228 pool_->Schedule([ctx, n]() { ctx->handle_range(0, n); });
229 }
230 }
231
232 // Convenience wrapper for parallelForAsync that does not align blocks.
233 void parallelForAsync(Index n, const TensorOpCost& cost, std::function<void(Index, Index)> f,
234 std::function<void()> done) const {
235 parallelForAsync(n, cost, nullptr, std::move(f), std::move(done));
236 }
237
238 // Thread pool accessor.
239 ThreadPoolInterface* getPool() const { return pool_; }
240
241 // Allocator accessor.
242 Allocator* allocator() const { return allocator_; }
243
244 private:
245 typedef TensorCostModel<ThreadPoolDevice> CostModel;
246
247 static void handleRange(Index firstIdx, Index lastIdx, Index granularity, Barrier* barrier, ThreadPoolInterface* pool,
248 const std::function<void(Index, Index)>& f) {
249 while (lastIdx - firstIdx > granularity) {
250 // Split into halves and schedule the second half on a different thread.
251 const Index midIdx = firstIdx + numext::div_ceil((lastIdx - firstIdx) / 2, granularity) * granularity;
252 pool->Schedule([=, &f]() { handleRange(midIdx, lastIdx, granularity, barrier, pool, f); });
253 lastIdx = midIdx;
254 }
255 // Single block or less, execute directly.
256 f(firstIdx, lastIdx);
257 barrier->Notify();
258 }
259
260 // For parallelForAsync we must keep passed in closures on the heap, and
261 // delete them only after `done` callback finished.
262 struct ParallelForAsyncContext {
263 ParallelForAsyncContext(Index block_count, std::function<void(Index, Index)> block_f,
264 std::function<void()> done_callback)
265 : count(block_count), f(std::move(block_f)), done(std::move(done_callback)) {}
266 ~ParallelForAsyncContext() { done(); }
267
268 std::atomic<Index> count;
269 std::function<void(Index, Index)> f;
270 std::function<void()> done;
271
272 std::function<void(Index, Index)> handle_range;
273 };
274
275 struct ParallelForBlock {
276 Index size; // block size
277 Index count; // number of blocks
278 };
279
280 // Calculates block size based on (1) the iteration cost and (2) parallel
281 // efficiency. We want blocks to be not too small to mitigate parallelization
282 // overheads; not too large to mitigate tail effect and potential load
283 // imbalance and we also want number of blocks to be evenly dividable across
284 // threads.
285 ParallelForBlock CalculateParallelForBlock(const Index n, const TensorOpCost& cost,
286 std::function<Index(Index)> block_align) const {
287 const double block_size_f = 1.0 / CostModel::taskSize(1, cost);
288 const Index max_oversharding_factor = 4;
289 Index block_size = numext::mini(
290 n, numext::maxi<Index>(numext::div_ceil<Index>(n, max_oversharding_factor * numThreads()), block_size_f));
291 const Index max_block_size = numext::mini(n, 2 * block_size);
292
293 if (block_align) {
294 Index new_block_size = block_align(block_size);
295 eigen_assert(new_block_size >= block_size);
296 block_size = numext::mini(n, new_block_size);
297 }
298
299 Index block_count = numext::div_ceil(n, block_size);
300
301 // Calculate parallel efficiency as fraction of total CPU time used for
302 // computations:
303 double max_efficiency =
304 static_cast<double>(block_count) / (numext::div_ceil<Index>(block_count, numThreads()) * numThreads());
305
306 // Now try to increase block size up to max_block_size as long as it
307 // doesn't decrease parallel efficiency.
308 for (Index prev_block_count = block_count; max_efficiency < 1.0 && prev_block_count > 1;) {
309 // This is the next block size that divides size into a smaller number
310 // of blocks than the current block_size.
311 Index coarser_block_size = numext::div_ceil(n, prev_block_count - 1);
312 if (block_align) {
313 Index new_block_size = block_align(coarser_block_size);
314 eigen_assert(new_block_size >= coarser_block_size);
315 coarser_block_size = numext::mini(n, new_block_size);
316 }
317 if (coarser_block_size > max_block_size) {
318 break; // Reached max block size. Stop.
319 }
320 // Recalculate parallel efficiency.
321 const Index coarser_block_count = numext::div_ceil(n, coarser_block_size);
322 eigen_assert(coarser_block_count < prev_block_count);
323 prev_block_count = coarser_block_count;
324 const double coarser_efficiency = static_cast<double>(coarser_block_count) /
325 (numext::div_ceil<Index>(coarser_block_count, numThreads()) * numThreads());
326 if (coarser_efficiency + 0.01 >= max_efficiency) {
327 // Taking it.
328 block_size = coarser_block_size;
329 block_count = coarser_block_count;
330 if (max_efficiency < coarser_efficiency) {
331 max_efficiency = coarser_efficiency;
332 }
333 }
334 }
335
336 return {block_size, block_count};
337 }
338
339 ThreadPoolInterface* pool_;
340 int num_threads_;
341 Allocator* allocator_;
342};
343
344} // end namespace Eigen
345
346#endif // EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
Namespace containing all symbols from the Eigen library.
std::ptrdiff_t l1CacheSize()
EIGEN_DEFAULT_DENSE_INDEX_TYPE Index
std::ptrdiff_t l3CacheSize()