10#ifndef EIGEN_PARALLELIZER_H
11#define EIGEN_PARALLELIZER_H
14#include "../InternalHeaderCheck.h"
37#if defined(EIGEN_HAS_OPENMP) && defined(EIGEN_GEMM_THREADPOOL)
38#error "EIGEN_HAS_OPENMP and EIGEN_GEMM_THREADPOOL may not both be defined."
44inline void manage_multi_threading(Action action,
int* v);
56 internal::manage_multi_threading(GetAction, &ret);
62inline void setNbThreads(
int v) { internal::manage_multi_threading(SetAction, &v); }
64#ifdef EIGEN_GEMM_THREADPOOL
73inline ThreadPool* setGemmThreadPool(ThreadPool* new_pool) {
74 static ThreadPool* pool =
nullptr;
75 if (new_pool !=
nullptr) {
86inline ThreadPool* getGemmThreadPool() {
return setGemmThreadPool(
nullptr); }
93#if defined(EIGEN_USE_BLAS) || (!defined(EIGEN_HAS_OPENMP) && !defined(EIGEN_GEMM_THREADPOOL))
95inline void manage_multi_threading(Action action,
int* v) {
96 if (action == SetAction) {
97 eigen_internal_assert(v !=
nullptr);
98 }
else if (action == GetAction) {
99 eigen_internal_assert(v !=
nullptr);
102 eigen_internal_assert(
false);
105template <
typename Index>
106struct GemmParallelInfo {};
107template <
bool Condition,
typename Functor,
typename Index>
108EIGEN_STRONG_INLINE
void parallelize_gemm(
const Functor& func,
Index rows,
Index cols,
Index ,
110 func(0, rows, 0, cols);
115template <
typename Index>
116struct GemmParallelTaskInfo {
117 GemmParallelTaskInfo() : sync(-1), users(0), lhs_start(0), lhs_length(0) {}
118 std::atomic<Index> sync;
119 std::atomic<int> users;
124template <
typename Index>
125struct GemmParallelInfo {
126 const int logical_thread_id;
127 const int num_threads;
128 GemmParallelTaskInfo<Index>* task_info;
130 GemmParallelInfo(
int logical_thread_id_,
int num_threads_, GemmParallelTaskInfo<Index>* task_info_)
131 : logical_thread_id(logical_thread_id_), num_threads(num_threads_), task_info(task_info_) {}
134inline void manage_multi_threading(Action action,
int* v) {
135 static int m_maxThreads = -1;
136 if (action == SetAction) {
137 eigen_internal_assert(v !=
nullptr);
138#if defined(EIGEN_HAS_OPENMP)
142 eigen_internal_assert(*v >= 0);
143 int omp_threads = omp_get_max_threads();
144 m_maxThreads = (*v == 0 ? omp_threads : std::min(*v, omp_threads));
145#elif defined(EIGEN_GEMM_THREADPOOL)
149 eigen_internal_assert(*v >= 0);
150 ThreadPool* pool = getGemmThreadPool();
151 int pool_threads = pool !=
nullptr ? pool->NumThreads() : 1;
152 m_maxThreads = (*v == 0 ? pool_threads : numext::mini(pool_threads, *v));
154 }
else if (action == GetAction) {
155 eigen_internal_assert(v !=
nullptr);
156#if defined(EIGEN_HAS_OPENMP)
157 if (m_maxThreads > 0)
160 *v = omp_get_max_threads();
165 eigen_internal_assert(
false);
169template <
bool Condition,
typename Functor,
typename Index>
170EIGEN_STRONG_INLINE
void parallelize_gemm(
const Functor& func,
Index rows,
Index cols,
Index depth,
bool transpose) {
180 Index size = transpose ? rows : cols;
181 Index pb_max_threads = std::max<Index>(1, size / Functor::Traits::nr);
184 double work =
static_cast<double>(rows) *
static_cast<double>(cols) *
static_cast<double>(depth);
185 double kMinTaskSize = 50000;
186 pb_max_threads = std::max<Index>(1, std::min<Index>(pb_max_threads,
static_cast<Index>(work / kMinTaskSize)));
189 int threads = std::min<int>(
nbThreads(),
static_cast<int>(pb_max_threads));
193 bool dont_parallelize = (!Condition) || (threads <= 1);
194#if defined(EIGEN_HAS_OPENMP)
196 dont_parallelize |= omp_get_num_threads() > 1;
197#elif defined(EIGEN_GEMM_THREADPOOL)
202 ThreadPool* pool = getGemmThreadPool();
203 dont_parallelize |= (pool ==
nullptr || pool->CurrentThreadId() != -1);
205 if (dont_parallelize)
return func(0, rows, 0, cols);
207 func.initParallelSession(threads);
209 if (transpose) std::swap(rows, cols);
211 ei_declare_aligned_stack_constructed_variable(GemmParallelTaskInfo<Index>, task_info, threads, 0);
213#if defined(EIGEN_HAS_OPENMP)
214#pragma omp parallel num_threads(threads)
216 Index i = omp_get_thread_num();
219 Index actual_threads = omp_get_num_threads();
220 GemmParallelInfo<Index> info(i,
static_cast<int>(actual_threads), task_info);
222 Index blockCols = (cols / actual_threads) & ~
Index(0x3);
223 Index blockRows = (rows / actual_threads);
224 blockRows = (blockRows / Functor::Traits::mr) * Functor::Traits::mr;
226 Index r0 = i * blockRows;
227 Index actualBlockRows = (i + 1 == actual_threads) ? rows - r0 : blockRows;
229 Index c0 = i * blockCols;
230 Index actualBlockCols = (i + 1 == actual_threads) ? cols - c0 : blockCols;
232 info.task_info[i].lhs_start = r0;
233 info.task_info[i].lhs_length = actualBlockRows;
236 func(c0, actualBlockCols, 0, rows, &info);
238 func(0, rows, c0, actualBlockCols, &info);
241#elif defined(EIGEN_GEMM_THREADPOOL)
242 Barrier barrier(threads);
243 auto task = [=, &func, &barrier, &task_info](
int i) {
244 Index actual_threads = threads;
245 GemmParallelInfo<Index> info(i,
static_cast<int>(actual_threads), task_info);
246 Index blockCols = (cols / actual_threads) & ~
Index(0x3);
247 Index blockRows = (rows / actual_threads);
248 blockRows = (blockRows / Functor::Traits::mr) * Functor::Traits::mr;
250 Index r0 = i * blockRows;
251 Index actualBlockRows = (i + 1 == actual_threads) ? rows - r0 : blockRows;
253 Index c0 = i * blockCols;
254 Index actualBlockCols = (i + 1 == actual_threads) ? cols - c0 : blockCols;
256 info.task_info[i].lhs_start = r0;
257 info.task_info[i].lhs_length = actualBlockRows;
260 func(c0, actualBlockCols, 0, rows, &info);
262 func(0, rows, c0, actualBlockCols, &info);
269 for (
int i = 0; i < threads - 1; ++i) {
270 pool->Schedule([=, task = std::move(task)] { task(i); });
Namespace containing all symbols from the Eigen library.
Definition B01_Experimental.dox:1
EIGEN_DEFAULT_DENSE_INDEX_TYPE Index
The Index type as used for the API.
Definition Meta.h:82
int nbThreads()
Definition Parallelizer.h:54
EIGEN_DEPRECATED void initParallel()
Definition Parallelizer.h:50
void setNbThreads(int v)
Definition Parallelizer.h:62