Eigen  3.4.90 (git rev 9589cc4e7fd8e4538bedef80dd36c7738977a8be)
 
All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
Loading...
Searching...
No Matches
Parallelizer.h
1// This file is part of Eigen, a lightweight C++ template library
2// for linear algebra.
3//
4// Copyright (C) 2010 Gael Guennebaud <gael.guennebaud@inria.fr>
5//
6// This Source Code Form is subject to the terms of the Mozilla
7// Public License v. 2.0. If a copy of the MPL was not distributed
8// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9
10#ifndef EIGEN_PARALLELIZER_H
11#define EIGEN_PARALLELIZER_H
12
13// IWYU pragma: private
14#include "../InternalHeaderCheck.h"
15
16// Note that in the following, there are 3 different uses of the concept
17// "number of threads":
18// 1. Max number of threads used by OpenMP or ThreadPool.
19// * For OpenMP this is typically the value set by the OMP_NUM_THREADS
20// environment variable, or by a call to omp_set_num_threads() prior to
21// calling Eigen.
22// * For ThreadPool, this is the number of threads in the ThreadPool.
23// 2. Max number of threads currently allowed to be used by parallel Eigen
24// operations. This is set by setNbThreads(), and cannot exceed the value
25// in 1.
26// 3. The actual number of threads used for a given parallel Eigen operation.
27// This is typically computed on the fly using a cost model and cannot exceed
28// the value in 2.
29// * For OpenMP, this is typically the number of threads specified in individual
30// "omp parallel" pragmas associated with an Eigen operation.
31// * For ThreadPool, it is the number of concurrent tasks scheduled in the
32// threadpool for a given Eigen operation. Notice that since the threadpool
33// uses task stealing, there is no way to limit the number of concurrently
34// executing tasks to below the number in 1. except by limiting the total
35// number of tasks in flight.
36
37#if defined(EIGEN_HAS_OPENMP) && defined(EIGEN_GEMM_THREADPOOL)
38#error "EIGEN_HAS_OPENMP and EIGEN_GEMM_THREADPOOL may not both be defined."
39#endif
40
41namespace Eigen {
42
43namespace internal {
44inline void manage_multi_threading(Action action, int* v);
45}
46
47// Public APIs.
48
50EIGEN_DEPRECATED inline void initParallel() {}
51
54inline int nbThreads() {
55 int ret;
56 internal::manage_multi_threading(GetAction, &ret);
57 return ret;
58}
59
62inline void setNbThreads(int v) { internal::manage_multi_threading(SetAction, &v); }
63
64#ifdef EIGEN_GEMM_THREADPOOL
65// Sets the ThreadPool used by Eigen parallel Gemm.
66//
67// NOTICE: This function has a known race condition with
68// parallelize_gemm below, and should not be called while
69// an instance of that function is running.
70//
71// TODO(rmlarsen): Make the device API available instead of
72// storing a local static pointer variable to avoid this issue.
73inline ThreadPool* setGemmThreadPool(ThreadPool* new_pool) {
74 static ThreadPool* pool = nullptr;
75 if (new_pool != nullptr) {
76 // This will wait for work in all threads in *pool to finish,
77 // then destroy the old ThreadPool, and then replace it with new_pool.
78 pool = new_pool;
79 // Reset the number of threads to the number of threads on the new pool.
80 setNbThreads(pool->NumThreads());
81 }
82 return pool;
83}
84
85// Gets the ThreadPool used by Eigen parallel Gemm.
86inline ThreadPool* getGemmThreadPool() { return setGemmThreadPool(nullptr); }
87#endif
88
89namespace internal {
90
91// Implementation.
92
93#if defined(EIGEN_USE_BLAS) || (!defined(EIGEN_HAS_OPENMP) && !defined(EIGEN_GEMM_THREADPOOL))
94
95inline void manage_multi_threading(Action action, int* v) {
96 if (action == SetAction) {
97 eigen_internal_assert(v != nullptr);
98 } else if (action == GetAction) {
99 eigen_internal_assert(v != nullptr);
100 *v = 1;
101 } else {
102 eigen_internal_assert(false);
103 }
104}
105template <typename Index>
106struct GemmParallelInfo {};
107template <bool Condition, typename Functor, typename Index>
108EIGEN_STRONG_INLINE void parallelize_gemm(const Functor& func, Index rows, Index cols, Index /*unused*/,
109 bool /*unused*/) {
110 func(0, rows, 0, cols);
111}
112
113#else
114
115template <typename Index>
116struct GemmParallelTaskInfo {
117 GemmParallelTaskInfo() : sync(-1), users(0), lhs_start(0), lhs_length(0) {}
118 std::atomic<Index> sync;
119 std::atomic<int> users;
120 Index lhs_start;
121 Index lhs_length;
122};
123
124template <typename Index>
125struct GemmParallelInfo {
126 const int logical_thread_id;
127 const int num_threads;
128 GemmParallelTaskInfo<Index>* task_info;
129
130 GemmParallelInfo(int logical_thread_id_, int num_threads_, GemmParallelTaskInfo<Index>* task_info_)
131 : logical_thread_id(logical_thread_id_), num_threads(num_threads_), task_info(task_info_) {}
132};
133
134inline void manage_multi_threading(Action action, int* v) {
135 static int m_maxThreads = -1;
136 if (action == SetAction) {
137 eigen_internal_assert(v != nullptr);
138#if defined(EIGEN_HAS_OPENMP)
139 // Calling action == SetAction and *v = 0 means
140 // restoring m_maxThreads to the maximum number of threads specified
141 // for OpenMP.
142 eigen_internal_assert(*v >= 0);
143 int omp_threads = omp_get_max_threads();
144 m_maxThreads = (*v == 0 ? omp_threads : std::min(*v, omp_threads));
145#elif defined(EIGEN_GEMM_THREADPOOL)
146 // Calling action == SetAction and *v = 0 means
147 // restoring m_maxThreads to the number of threads in the ThreadPool,
148 // which defaults to 1 if no pool was provided.
149 eigen_internal_assert(*v >= 0);
150 ThreadPool* pool = getGemmThreadPool();
151 int pool_threads = pool != nullptr ? pool->NumThreads() : 1;
152 m_maxThreads = (*v == 0 ? pool_threads : numext::mini(pool_threads, *v));
153#endif
154 } else if (action == GetAction) {
155 eigen_internal_assert(v != nullptr);
156#if defined(EIGEN_HAS_OPENMP)
157 if (m_maxThreads > 0)
158 *v = m_maxThreads;
159 else
160 *v = omp_get_max_threads();
161#else
162 *v = m_maxThreads;
163#endif
164 } else {
165 eigen_internal_assert(false);
166 }
167}
168
169template <bool Condition, typename Functor, typename Index>
170EIGEN_STRONG_INLINE void parallelize_gemm(const Functor& func, Index rows, Index cols, Index depth, bool transpose) {
171 // Dynamically check whether we should even try to execute in parallel.
172 // The conditions are:
173 // - the max number of threads we can create is greater than 1
174 // - we are not already in a parallel code
175 // - the sizes are large enough
176
177 // compute the maximal number of threads from the size of the product:
178 // This first heuristic takes into account that the product kernel is fully optimized when working with nr columns at
179 // once.
180 Index size = transpose ? rows : cols;
181 Index pb_max_threads = std::max<Index>(1, size / Functor::Traits::nr);
182
183 // compute the maximal number of threads from the total amount of work:
184 double work = static_cast<double>(rows) * static_cast<double>(cols) * static_cast<double>(depth);
185 double kMinTaskSize = 50000; // FIXME improve this heuristic.
186 pb_max_threads = std::max<Index>(1, std::min<Index>(pb_max_threads, static_cast<Index>(work / kMinTaskSize)));
187
188 // compute the number of threads we are going to use
189 int threads = std::min<int>(nbThreads(), static_cast<int>(pb_max_threads));
190
191 // if multi-threading is explicitly disabled, not useful, or if we already are
192 // inside a parallel session, then abort multi-threading
193 bool dont_parallelize = (!Condition) || (threads <= 1);
194#if defined(EIGEN_HAS_OPENMP)
195 // don't parallelize if we are executing in a parallel context already.
196 dont_parallelize |= omp_get_num_threads() > 1;
197#elif defined(EIGEN_GEMM_THREADPOOL)
198 // don't parallelize if we have a trivial threadpool or the current thread id
199 // is != -1, indicating that we are already executing on a thread inside the pool.
200 // In other words, we do not allow nested parallelism, since this would lead to
201 // deadlocks due to the workstealing nature of the threadpool.
202 ThreadPool* pool = getGemmThreadPool();
203 dont_parallelize |= (pool == nullptr || pool->CurrentThreadId() != -1);
204#endif
205 if (dont_parallelize) return func(0, rows, 0, cols);
206
207 func.initParallelSession(threads);
208
209 if (transpose) std::swap(rows, cols);
210
211 ei_declare_aligned_stack_constructed_variable(GemmParallelTaskInfo<Index>, task_info, threads, 0);
212
213#if defined(EIGEN_HAS_OPENMP)
214#pragma omp parallel num_threads(threads)
215 {
216 Index i = omp_get_thread_num();
217 // Note that the actual number of threads might be lower than the number of
218 // requested ones
219 Index actual_threads = omp_get_num_threads();
220 GemmParallelInfo<Index> info(i, static_cast<int>(actual_threads), task_info);
221
222 Index blockCols = (cols / actual_threads) & ~Index(0x3);
223 Index blockRows = (rows / actual_threads);
224 blockRows = (blockRows / Functor::Traits::mr) * Functor::Traits::mr;
225
226 Index r0 = i * blockRows;
227 Index actualBlockRows = (i + 1 == actual_threads) ? rows - r0 : blockRows;
228
229 Index c0 = i * blockCols;
230 Index actualBlockCols = (i + 1 == actual_threads) ? cols - c0 : blockCols;
231
232 info.task_info[i].lhs_start = r0;
233 info.task_info[i].lhs_length = actualBlockRows;
234
235 if (transpose)
236 func(c0, actualBlockCols, 0, rows, &info);
237 else
238 func(0, rows, c0, actualBlockCols, &info);
239 }
240
241#elif defined(EIGEN_GEMM_THREADPOOL)
242 Barrier barrier(threads);
243 auto task = [=, &func, &barrier, &task_info](int i) {
244 Index actual_threads = threads;
245 GemmParallelInfo<Index> info(i, static_cast<int>(actual_threads), task_info);
246 Index blockCols = (cols / actual_threads) & ~Index(0x3);
247 Index blockRows = (rows / actual_threads);
248 blockRows = (blockRows / Functor::Traits::mr) * Functor::Traits::mr;
249
250 Index r0 = i * blockRows;
251 Index actualBlockRows = (i + 1 == actual_threads) ? rows - r0 : blockRows;
252
253 Index c0 = i * blockCols;
254 Index actualBlockCols = (i + 1 == actual_threads) ? cols - c0 : blockCols;
255
256 info.task_info[i].lhs_start = r0;
257 info.task_info[i].lhs_length = actualBlockRows;
258
259 if (transpose)
260 func(c0, actualBlockCols, 0, rows, &info);
261 else
262 func(0, rows, c0, actualBlockCols, &info);
263
264 barrier.Notify();
265 };
266 // Notice that we do not schedule more than "threads" tasks, which allows us to
267 // limit number of running threads, even if the threadpool itself was constructed
268 // with a larger number of threads.
269 for (int i = 0; i < threads - 1; ++i) {
270 pool->Schedule([=, task = std::move(task)] { task(i); });
271 }
272 task(threads - 1);
273 barrier.Wait();
274#endif
275}
276
277#endif
278
279} // end namespace internal
280} // end namespace Eigen
281
282#endif // EIGEN_PARALLELIZER_H
Namespace containing all symbols from the Eigen library.
Definition B01_Experimental.dox:1
EIGEN_DEFAULT_DENSE_INDEX_TYPE Index
The Index type as used for the API.
Definition Meta.h:82
int nbThreads()
Definition Parallelizer.h:54
EIGEN_DEPRECATED void initParallel()
Definition Parallelizer.h:50
void setNbThreads(int v)
Definition Parallelizer.h:62