Eigen  3.4.90 (git rev 9589cc4e7fd8e4538bedef80dd36c7738977a8be)
 
All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
Loading...
Searching...
No Matches
CoreThreadPoolDevice.h
1// This file is part of Eigen, a lightweight C++ template library
2// for linear algebra.
3//
4// Copyright (C) 2023 Charlie Schlosser <cs.schlosser@gmail.com>
5//
6// This Source Code Form is subject to the terms of the Mozilla
7// Public License v. 2.0. If a copy of the MPL was not distributed
8// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9
10#ifndef EIGEN_CORE_THREAD_POOL_DEVICE_H
11#define EIGEN_CORE_THREAD_POOL_DEVICE_H
12
13namespace Eigen {
14
15// CoreThreadPoolDevice provides an easy-to-understand Device for parallelizing Eigen Core expressions with
16// Threadpool. Expressions are recursively split evenly until the evaluation cost is less than the threshold for
17// delegating the task to a thread.
18/*
19 a
20 / \
21 / \
22 / \
23 / \
24 / \
25 / \
26 / \
27 a e
28 / \ / \
29 / \ / \
30 / \ / \
31 a c e g
32 / \ / \ / \ / \
33 / \ / \ / \ / \
34 a b c d e f g h
35*/
36// Each task descends the binary tree to the left, delegates the right task to a new thread, and continues to the
37// left. This ensures that work is evenly distributed to the thread pool as quickly as possible and minimizes the number
38// of tasks created during the evaluation. Consider an expression that is divided into 8 chunks. The
39// primary task 'a' creates tasks 'e' 'c' and 'b', and executes its portion of the expression at the bottom of the
40// tree. Likewise, task 'e' creates tasks 'g' and 'f', and executes its portion of the expression.
41
42struct CoreThreadPoolDevice {
43 using Task = std::function<void()>;
44 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE CoreThreadPoolDevice(ThreadPool& pool, float threadCostThreshold = 3e-5f)
45 : m_pool(pool) {
46 eigen_assert(threadCostThreshold >= 0.0f && "threadCostThreshold must be non-negative");
47 m_costFactor = threadCostThreshold;
48 }
49
50 template <int PacketSize>
51 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int calculateLevels(Index size, float cost) const {
52 eigen_assert(cost >= 0.0f && "cost must be non-negative");
53 Index numOps = size / PacketSize;
54 int actualThreads = numOps < m_pool.NumThreads() ? static_cast<int>(numOps) : m_pool.NumThreads();
55 float totalCost = static_cast<float>(numOps) * cost;
56 float idealThreads = totalCost * m_costFactor;
57 if (idealThreads < static_cast<float>(actualThreads)) {
58 idealThreads = numext::maxi(idealThreads, 1.0f);
59 actualThreads = numext::mini(actualThreads, static_cast<int>(idealThreads));
60 }
61 int maxLevel = internal::log2_ceil(actualThreads);
62 return maxLevel;
63 }
64
65// MSVC does not like inlining parallelForImpl
66#if EIGEN_COMP_MSVC && !EIGEN_COMP_CLANG
67#define EIGEN_PARALLEL_FOR_INLINE
68#else
69#define EIGEN_PARALLEL_FOR_INLINE EIGEN_STRONG_INLINE
70#endif
71
72 template <typename UnaryFunctor, int PacketSize>
73 EIGEN_DEVICE_FUNC EIGEN_PARALLEL_FOR_INLINE void parallelForImpl(Index begin, Index end, UnaryFunctor& f,
74 Barrier& barrier, int level) {
75 while (level > 0) {
76 level--;
77 Index size = end - begin;
78 eigen_assert(size % PacketSize == 0 && "this function assumes size is a multiple of PacketSize");
79 Index mid = begin + numext::round_down(size >> 1, PacketSize);
80 Task right = [this, mid, end, &f, &barrier, level]() {
81 parallelForImpl<UnaryFunctor, PacketSize>(mid, end, f, barrier, level);
82 };
83 m_pool.Schedule(std::move(right));
84 end = mid;
85 }
86 for (Index i = begin; i < end; i += PacketSize) f(i);
87 barrier.Notify();
88 }
89
90 template <typename BinaryFunctor, int PacketSize>
91 EIGEN_DEVICE_FUNC EIGEN_PARALLEL_FOR_INLINE void parallelForImpl(Index outerBegin, Index outerEnd, Index innerBegin,
92 Index innerEnd, BinaryFunctor& f, Barrier& barrier,
93 int level) {
94 while (level > 0) {
95 level--;
96 Index outerSize = outerEnd - outerBegin;
97 if (outerSize > 1) {
98 Index outerMid = outerBegin + (outerSize >> 1);
99 Task right = [this, &f, &barrier, outerMid, outerEnd, innerBegin, innerEnd, level]() {
100 parallelForImpl<BinaryFunctor, PacketSize>(outerMid, outerEnd, innerBegin, innerEnd, f, barrier, level);
101 };
102 m_pool.Schedule(std::move(right));
103 outerEnd = outerMid;
104 } else {
105 Index innerSize = innerEnd - innerBegin;
106 eigen_assert(innerSize % PacketSize == 0 && "this function assumes innerSize is a multiple of PacketSize");
107 Index innerMid = innerBegin + numext::round_down(innerSize >> 1, PacketSize);
108 Task right = [this, &f, &barrier, outerBegin, outerEnd, innerMid, innerEnd, level]() {
109 parallelForImpl<BinaryFunctor, PacketSize>(outerBegin, outerEnd, innerMid, innerEnd, f, barrier, level);
110 };
111 m_pool.Schedule(std::move(right));
112 innerEnd = innerMid;
113 }
114 }
115 for (Index outer = outerBegin; outer < outerEnd; outer++)
116 for (Index inner = innerBegin; inner < innerEnd; inner += PacketSize) f(outer, inner);
117 barrier.Notify();
118 }
119
120#undef EIGEN_PARALLEL_FOR_INLINE
121
122 template <typename UnaryFunctor, int PacketSize>
123 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void parallelFor(Index begin, Index end, UnaryFunctor& f, float cost) {
124 Index size = end - begin;
125 int maxLevel = calculateLevels<PacketSize>(size, cost);
126 Barrier barrier(1 << maxLevel);
127 parallelForImpl<UnaryFunctor, PacketSize>(begin, end, f, barrier, maxLevel);
128 barrier.Wait();
129 }
130
131 template <typename BinaryFunctor, int PacketSize>
132 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void parallelFor(Index outerBegin, Index outerEnd, Index innerBegin,
133 Index innerEnd, BinaryFunctor& f, float cost) {
134 Index outerSize = outerEnd - outerBegin;
135 Index innerSize = innerEnd - innerBegin;
136 Index size = outerSize * innerSize;
137 int maxLevel = calculateLevels<PacketSize>(size, cost);
138 Barrier barrier(1 << maxLevel);
139 parallelForImpl<BinaryFunctor, PacketSize>(outerBegin, outerEnd, innerBegin, innerEnd, f, barrier, maxLevel);
140 barrier.Wait();
141 }
142
143 ThreadPool& m_pool;
144 // costFactor is the cost of delegating a task to a thread
145 // the inverse is used to avoid a floating point division
146 float m_costFactor;
147};
148
149// specialization of coefficient-wise assignment loops for CoreThreadPoolDevice
150
151namespace internal {
152
153#ifdef EIGEN_PARSED_BY_DOXYGEN
154struct Kernel;
155#endif
156
157template <typename Kernel>
158struct cost_helper {
159 using SrcEvaluatorType = typename Kernel::SrcEvaluatorType;
160 using DstEvaluatorType = typename Kernel::DstEvaluatorType;
161 using SrcXprType = typename SrcEvaluatorType::XprType;
162 using DstXprType = typename DstEvaluatorType::XprType;
163 static constexpr Index Cost = functor_cost<SrcXprType>::Cost + functor_cost<DstXprType>::Cost;
164};
165
166template <typename Kernel>
167struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, DefaultTraversal, NoUnrolling> {
168 static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost;
169 struct AssignmentFunctor : public Kernel {
170 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
171 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer, Index inner) {
172 this->assignCoeffByOuterInner(outer, inner);
173 }
174 };
175
176 static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
177 const Index innerSize = kernel.innerSize();
178 const Index outerSize = kernel.outerSize();
179 constexpr float cost = static_cast<float>(XprEvaluationCost);
180 AssignmentFunctor functor(kernel);
181 device.template parallelFor<AssignmentFunctor, 1>(0, outerSize, 0, innerSize, functor, cost);
182 }
183};
184
185template <typename Kernel>
186struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, DefaultTraversal, InnerUnrolling> {
187 using DstXprType = typename Kernel::DstEvaluatorType::XprType;
188 static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost, InnerSize = DstXprType::InnerSizeAtCompileTime;
189 struct AssignmentFunctor : public Kernel {
190 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
191 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer) {
192 copy_using_evaluator_DefaultTraversal_InnerUnrolling<Kernel, 0, InnerSize>::run(*this, outer);
193 }
194 };
195 static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
196 const Index outerSize = kernel.outerSize();
197 AssignmentFunctor functor(kernel);
198 constexpr float cost = static_cast<float>(XprEvaluationCost) * static_cast<float>(InnerSize);
199 device.template parallelFor<AssignmentFunctor, 1>(0, outerSize, functor, cost);
200 }
201};
202
203template <typename Kernel>
204struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, InnerVectorizedTraversal, NoUnrolling> {
205 using PacketType = typename Kernel::PacketType;
206 static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost, PacketSize = unpacket_traits<PacketType>::size,
207 SrcAlignment = Kernel::AssignmentTraits::SrcAlignment,
208 DstAlignment = Kernel::AssignmentTraits::DstAlignment;
209 struct AssignmentFunctor : public Kernel {
210 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
211 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer, Index inner) {
212 this->template assignPacketByOuterInner<Unaligned, Unaligned, PacketType>(outer, inner);
213 }
214 };
215 static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
216 const Index innerSize = kernel.innerSize();
217 const Index outerSize = kernel.outerSize();
218 const float cost = static_cast<float>(XprEvaluationCost) * static_cast<float>(innerSize);
219 AssignmentFunctor functor(kernel);
220 device.template parallelFor<AssignmentFunctor, PacketSize>(0, outerSize, 0, innerSize, functor, cost);
221 }
222};
223
224template <typename Kernel>
225struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, InnerVectorizedTraversal, InnerUnrolling> {
226 using PacketType = typename Kernel::PacketType;
227 using DstXprType = typename Kernel::DstEvaluatorType::XprType;
228 static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost, PacketSize = unpacket_traits<PacketType>::size,
229 SrcAlignment = Kernel::AssignmentTraits::SrcAlignment,
230 DstAlignment = Kernel::AssignmentTraits::DstAlignment,
231 InnerSize = DstXprType::InnerSizeAtCompileTime;
232 struct AssignmentFunctor : public Kernel {
233 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
234 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer) {
235 copy_using_evaluator_innervec_InnerUnrolling<Kernel, 0, InnerSize, SrcAlignment, DstAlignment>::run(*this, outer);
236 }
237 };
238 static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
239 const Index outerSize = kernel.outerSize();
240 constexpr float cost = static_cast<float>(XprEvaluationCost) * static_cast<float>(InnerSize);
241 AssignmentFunctor functor(kernel);
242 device.template parallelFor<AssignmentFunctor, PacketSize>(0, outerSize, functor, cost);
243 }
244};
245
246template <typename Kernel>
247struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, SliceVectorizedTraversal, NoUnrolling> {
248 using Scalar = typename Kernel::Scalar;
249 using PacketType = typename Kernel::PacketType;
250 static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost, PacketSize = unpacket_traits<PacketType>::size;
251 struct PacketAssignmentFunctor : public Kernel {
252 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE PacketAssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
253 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer, Index inner) {
254 this->template assignPacketByOuterInner<Unaligned, Unaligned, PacketType>(outer, inner);
255 }
256 };
257 struct ScalarAssignmentFunctor : public Kernel {
258 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE ScalarAssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
259 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer) {
260 const Index innerSize = this->innerSize();
261 const Index packetAccessSize = numext::round_down(innerSize, PacketSize);
262 for (Index inner = packetAccessSize; inner < innerSize; inner++) this->assignCoeffByOuterInner(outer, inner);
263 }
264 };
265 static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
266 const Index outerSize = kernel.outerSize();
267 const Index innerSize = kernel.innerSize();
268 const Index packetAccessSize = numext::round_down(innerSize, PacketSize);
269 constexpr float packetCost = static_cast<float>(XprEvaluationCost);
270 const float scalarCost = static_cast<float>(XprEvaluationCost) * static_cast<float>(innerSize - packetAccessSize);
271 PacketAssignmentFunctor packetFunctor(kernel);
272 ScalarAssignmentFunctor scalarFunctor(kernel);
273 device.template parallelFor<PacketAssignmentFunctor, PacketSize>(0, outerSize, 0, packetAccessSize, packetFunctor,
274 packetCost);
275 device.template parallelFor<ScalarAssignmentFunctor, 1>(0, outerSize, scalarFunctor, scalarCost);
276 };
277};
278
279template <typename Kernel>
280struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, LinearTraversal, NoUnrolling> {
281 static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost;
282 struct AssignmentFunctor : public Kernel {
283 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
284 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index index) { this->assignCoeff(index); }
285 };
286 static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
287 const Index size = kernel.size();
288 constexpr float cost = static_cast<float>(XprEvaluationCost);
289 AssignmentFunctor functor(kernel);
290 device.template parallelFor<AssignmentFunctor, 1>(0, size, functor, cost);
291 }
292};
293
294template <typename Kernel>
295struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, LinearVectorizedTraversal, NoUnrolling> {
296 using Scalar = typename Kernel::Scalar;
297 using PacketType = typename Kernel::PacketType;
298 static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost,
299 RequestedAlignment = Kernel::AssignmentTraits::LinearRequiredAlignment,
300 PacketSize = unpacket_traits<PacketType>::size,
301 DstIsAligned = Kernel::AssignmentTraits::DstAlignment >= RequestedAlignment,
302 DstAlignment = packet_traits<Scalar>::AlignedOnScalar ? RequestedAlignment
303 : Kernel::AssignmentTraits::DstAlignment,
304 SrcAlignment = Kernel::AssignmentTraits::JointAlignment;
305 struct AssignmentFunctor : public Kernel {
306 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
307 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index index) {
308 this->template assignPacket<DstAlignment, SrcAlignment, PacketType>(index);
309 }
310 };
311 static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
312 const Index size = kernel.size();
313 const Index alignedStart =
314 DstIsAligned ? 0 : internal::first_aligned<RequestedAlignment>(kernel.dstDataPtr(), size);
315 const Index alignedEnd = alignedStart + numext::round_down(size - alignedStart, PacketSize);
316
317 unaligned_dense_assignment_loop<DstIsAligned != 0>::run(kernel, 0, alignedStart);
318
319 constexpr float cost = static_cast<float>(XprEvaluationCost);
320 AssignmentFunctor functor(kernel);
321 device.template parallelFor<AssignmentFunctor, PacketSize>(alignedStart, alignedEnd, functor, cost);
322
323 unaligned_dense_assignment_loop<>::run(kernel, alignedEnd, size);
324 }
325};
326
327} // namespace internal
328
329} // namespace Eigen
330
331#endif // EIGEN_CORE_THREAD_POOL_DEVICE_H
Namespace containing all symbols from the Eigen library.
Definition B01_Experimental.dox:1
EIGEN_DEFAULT_DENSE_INDEX_TYPE Index
The Index type as used for the API.
Definition Meta.h:82