10#if defined(EIGEN_USE_THREADS) && !defined(EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H)
11#define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
14#include "./InternalHeaderCheck.h"
21 virtual ~Allocator() {}
22 virtual void* allocate(
size_t num_bytes)
const = 0;
23 virtual void deallocate(
void* buffer)
const = 0;
27struct ThreadPoolDevice {
29 ThreadPoolDevice(ThreadPoolInterface* pool,
int num_cores, Allocator* allocator =
nullptr)
30 : pool_(pool), num_threads_(num_cores), allocator_(allocator) {}
32 EIGEN_STRONG_INLINE
void* allocate(
size_t num_bytes)
const {
33 return allocator_ ? allocator_->allocate(num_bytes) : internal::aligned_malloc(num_bytes);
36 EIGEN_STRONG_INLINE
void deallocate(
void* buffer)
const {
38 allocator_->deallocate(buffer);
40 internal::aligned_free(buffer);
44 EIGEN_STRONG_INLINE
void* allocate_temp(
size_t num_bytes)
const {
return allocate(num_bytes); }
46 EIGEN_STRONG_INLINE
void deallocate_temp(
void* buffer)
const { deallocate(buffer); }
48 template <
typename Type>
49 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE Type get(Type data)
const {
53 EIGEN_STRONG_INLINE
void memcpy(
void* dst,
const void* src,
size_t n)
const {
55 ::memcpy(dst, src, n);
61 const size_t kMinBlockSize = 32768;
62 const size_t num_threads = CostModel::numThreads(n, TensorOpCost(1.0, 1.0, 0), 4);
63 if (n <= kMinBlockSize || num_threads < 2) {
64 ::memcpy(dst, src, n);
66 const char* src_ptr =
static_cast<const char*
>(src);
67 char* dst_ptr =
static_cast<char*
>(dst);
68 const size_t blocksize = (n + (num_threads - 1)) / num_threads;
69 Barrier barrier(
static_cast<int>(num_threads - 1));
71 for (
size_t i = 1; i < num_threads; ++i) {
72 pool_->Schedule([n, i, src_ptr, dst_ptr, blocksize, &barrier] {
73 ::memcpy(dst_ptr + i * blocksize, src_ptr + i * blocksize, numext::mini(blocksize, n - (i * blocksize)));
78 ::memcpy(dst_ptr, src_ptr, blocksize);
83 EIGEN_STRONG_INLINE
void memcpyHostToDevice(
void* dst,
const void* src,
size_t n)
const { memcpy(dst, src, n); }
84 EIGEN_STRONG_INLINE
void memcpyDeviceToHost(
void* dst,
const void* src,
size_t n)
const { memcpy(dst, src, n); }
86 EIGEN_STRONG_INLINE
void memset(
void* buffer,
int c,
size_t n)
const { ::memset(buffer, c, n); }
89 EIGEN_STRONG_INLINE
void fill(T* begin, T* end,
const T& value)
const {
90 std::fill(begin, end, value);
93 EIGEN_STRONG_INLINE
int numThreads()
const {
return num_threads_; }
97 EIGEN_STRONG_INLINE
int numThreadsInPool()
const {
return pool_->NumThreads(); }
99 EIGEN_STRONG_INLINE
size_t firstLevelCacheSize()
const {
return l1CacheSize(); }
101 EIGEN_STRONG_INLINE
size_t lastLevelCacheSize()
const {
106 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE
void synchronize()
const {
110 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE
int majorDeviceVersion()
const {
116 template <
class Function,
class... Args>
117 EIGEN_STRONG_INLINE
void enqueueNoNotification(Function&& f, Args&&... args)
const {
118 enqueue(std::forward<Function>(f), std::forward<Args>(args)...);
121 template <
class Function,
class... Args>
122 EIGEN_STRONG_INLINE
void enqueue(Function&& f, Args&&... args)
const {
123#if EIGEN_COMP_CXXVER >= 20
124 if constexpr (
sizeof...(args) > 0) {
125 auto run_f = [f = std::forward<Function>(f), ... args = std::forward<Args>(args)]() { f(args...); };
127 if (
sizeof...(args) > 0) {
128 auto run_f = [f = std::forward<Function>(f), &args...]() { f(args...); };
130 pool_->Schedule(std::move(run_f));
132 pool_->Schedule(std::forward<Function>(f));
138 EIGEN_STRONG_INLINE
int currentThreadId()
const {
return pool_->CurrentThreadId(); }
147 void parallelFor(Index n,
const TensorOpCost& cost, std::function<
Index(Index)> block_align,
148 std::function<
void(Index, Index)> f)
const {
149 if (EIGEN_PREDICT_FALSE(n <= 0)) {
152 }
else if (n == 1 || numThreads() == 1 || CostModel::numThreads(n, cost,
static_cast<int>(numThreads())) == 1) {
158 ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
163 Barrier barrier(
static_cast<unsigned int>(block.count));
164 if (block.count <= numThreads()) {
167 handleRange(0, n, block.size, &barrier, pool_, f);
171 pool_->Schedule([
this, n, &block, &barrier, &f]() { handleRange(0, n, block.size, &barrier, pool_, f); });
178 void parallelFor(Index n,
const TensorOpCost& cost, std::function<
void(Index, Index)> f)
const {
179 parallelFor(n, cost,
nullptr, std::move(f));
189 void parallelForAsync(Index n,
const TensorOpCost& cost, std::function<
Index(Index)> block_align,
190 std::function<
void(Index, Index)> f, std::function<
void()> done)
const {
192 if (n <= 1 || numThreads() == 1 || CostModel::numThreads(n, cost,
static_cast<int>(numThreads())) == 1) {
199 ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
201 ParallelForAsyncContext*
const ctx =
new ParallelForAsyncContext(block.count, std::move(f), std::move(done));
206 ctx->handle_range = [
this, ctx, block](
Index firstIdx,
Index lastIdx) {
207 while (lastIdx - firstIdx > block.size) {
209 const Index midIdx = firstIdx + numext::div_ceil((lastIdx - firstIdx) / 2, block.size) * block.size;
210 pool_->Schedule([ctx, midIdx, lastIdx]() { ctx->handle_range(midIdx, lastIdx); });
215 ctx->f(firstIdx, lastIdx);
218 if (ctx->count.fetch_sub(1) == 1)
delete ctx;
221 if (block.count <= numThreads()) {
224 ctx->handle_range(0, n);
228 pool_->Schedule([ctx, n]() { ctx->handle_range(0, n); });
233 void parallelForAsync(Index n,
const TensorOpCost& cost, std::function<
void(Index, Index)> f,
234 std::function<
void()> done)
const {
235 parallelForAsync(n, cost,
nullptr, std::move(f), std::move(done));
239 ThreadPoolInterface* getPool()
const {
return pool_; }
242 Allocator* allocator()
const {
return allocator_; }
245 typedef TensorCostModel<ThreadPoolDevice> CostModel;
247 static void handleRange(Index firstIdx, Index lastIdx, Index granularity, Barrier* barrier, ThreadPoolInterface* pool,
248 const std::function<
void(Index, Index)>& f) {
249 while (lastIdx - firstIdx > granularity) {
251 const Index midIdx = firstIdx + numext::div_ceil((lastIdx - firstIdx) / 2, granularity) * granularity;
252 pool->Schedule([=, &f]() { handleRange(midIdx, lastIdx, granularity, barrier, pool, f); });
256 f(firstIdx, lastIdx);
262 struct ParallelForAsyncContext {
263 ParallelForAsyncContext(Index block_count, std::function<
void(Index, Index)> block_f,
264 std::function<
void()> done_callback)
265 : count(block_count), f(std::move(block_f)), done(std::move(done_callback)) {}
266 ~ParallelForAsyncContext() { done(); }
268 std::atomic<Index> count;
269 std::function<void(Index, Index)> f;
270 std::function<void()> done;
272 std::function<void(Index, Index)> handle_range;
275 struct ParallelForBlock {
285 ParallelForBlock CalculateParallelForBlock(
const Index n,
const TensorOpCost& cost,
286 std::function<
Index(Index)> block_align)
const {
287 const double block_size_f = 1.0 / CostModel::taskSize(1, cost);
288 const Index max_oversharding_factor = 4;
289 Index block_size = numext::mini(
290 n, numext::maxi<Index>(numext::div_ceil<Index>(n, max_oversharding_factor * numThreads()), block_size_f));
291 const Index max_block_size = numext::mini(n, 2 * block_size);
294 Index new_block_size = block_align(block_size);
295 eigen_assert(new_block_size >= block_size);
296 block_size = numext::mini(n, new_block_size);
299 Index block_count = numext::div_ceil(n, block_size);
303 double max_efficiency =
304 static_cast<double>(block_count) / (numext::div_ceil<Index>(block_count, numThreads()) * numThreads());
308 for (Index prev_block_count = block_count; max_efficiency < 1.0 && prev_block_count > 1;) {
311 Index coarser_block_size = numext::div_ceil(n, prev_block_count - 1);
313 Index new_block_size = block_align(coarser_block_size);
314 eigen_assert(new_block_size >= coarser_block_size);
315 coarser_block_size = numext::mini(n, new_block_size);
317 if (coarser_block_size > max_block_size) {
321 const Index coarser_block_count = numext::div_ceil(n, coarser_block_size);
322 eigen_assert(coarser_block_count < prev_block_count);
323 prev_block_count = coarser_block_count;
324 const double coarser_efficiency =
static_cast<double>(coarser_block_count) /
325 (numext::div_ceil<Index>(coarser_block_count, numThreads()) * numThreads());
326 if (coarser_efficiency + 0.01 >= max_efficiency) {
328 block_size = coarser_block_size;
329 block_count = coarser_block_count;
330 if (max_efficiency < coarser_efficiency) {
331 max_efficiency = coarser_efficiency;
336 return {block_size, block_count};
339 ThreadPoolInterface* pool_;
341 Allocator* allocator_;
Namespace containing all symbols from the Eigen library.
std::ptrdiff_t l1CacheSize()
EIGEN_DEFAULT_DENSE_INDEX_TYPE Index
std::ptrdiff_t l3CacheSize()