Eigen  5.0.1-dev+60122df6
 
Loading...
Searching...
No Matches
ThreadLocal.h
1// This file is part of Eigen, a lightweight C++ template library
2// for linear algebra.
3//
4// Copyright (C) 2016 Benoit Steiner <benoit.steiner.goog@gmail.com>
5//
6// This Source Code Form is subject to the terms of the Mozilla
7// Public License v. 2.0. If a copy of the MPL was not distributed
8// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9
10#ifndef EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
11#define EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
12
13#ifdef EIGEN_AVOID_THREAD_LOCAL
14
15#ifdef EIGEN_THREAD_LOCAL
16#undef EIGEN_THREAD_LOCAL
17#endif
18
19#else
20
21#if ((EIGEN_COMP_GNUC) || __has_feature(cxx_thread_local) || EIGEN_COMP_MSVC)
22#define EIGEN_THREAD_LOCAL static thread_local
23#endif
24
25// Disable TLS for Apple and Android builds with older toolchains.
26#if defined(__APPLE__)
27// Included for TARGET_OS_IPHONE, __IPHONE_OS_VERSION_MIN_REQUIRED,
28// __IPHONE_8_0.
29#include <Availability.h>
30#include <TargetConditionals.h>
31#endif
32// Checks whether C++11's `thread_local` storage duration specifier is
33// supported.
34#if EIGEN_COMP_CLANGAPPLE && \
35 ((EIGEN_COMP_CLANGAPPLE < 8000042) || (TARGET_OS_IPHONE && __IPHONE_OS_VERSION_MIN_REQUIRED < __IPHONE_9_0))
36// Notes: Xcode's clang did not support `thread_local` until version
37// 8, and even then not for all iOS < 9.0.
38#undef EIGEN_THREAD_LOCAL
39
40#elif defined(__ANDROID__) && EIGEN_COMP_CLANG
41// There are platforms for which TLS should not be used even though the compiler
42// makes it seem like it's supported (Android NDK < r12b for example).
43// This is primarily because of linker problems and toolchain misconfiguration:
44// TLS isn't supported until NDK r12b per
45// https://developer.android.com/ndk/downloads/revision_history.html
46
47#if defined(__ANDROID__) && defined(__clang__) && defined(__NDK_MAJOR__) && defined(__NDK_MINOR__) && \
48 ((__NDK_MAJOR__ < 12) || ((__NDK_MAJOR__ == 12) && (__NDK_MINOR__ < 1)))
49#undef EIGEN_THREAD_LOCAL
50#endif
51#endif // defined(__ANDROID__) && defined(__clang__)
52
53#endif // EIGEN_AVOID_THREAD_LOCAL
54
55// IWYU pragma: private
56#include "./InternalHeaderCheck.h"
57
58namespace Eigen {
59
60namespace internal {
61template <typename T>
62struct ThreadLocalNoOpInitialize {
63 void operator()(T&) const {}
64};
65
66template <typename T>
67struct ThreadLocalNoOpRelease {
68 void operator()(T&) const {}
69};
70
71} // namespace internal
72
73// Thread local container for elements of type T, that does not use thread local
74// storage. As long as the number of unique threads accessing this storage
75// is smaller than `capacity_`, it is lock-free and wait-free. Otherwise it will
76// use a mutex for synchronization.
77//
78// Type `T` has to be default constructible, and by default each thread will get
79// a default constructed value. It is possible to specify custom `initialize`
80// callable, that will be called lazily from each thread accessing this object,
81// and will be passed a default initialized object of type `T`. Also it's
82// possible to pass a custom `release` callable, that will be invoked before
83// calling ~T().
84//
85// Example:
86//
87// struct Counter {
88// int value = 0;
89// }
90//
91// Eigen::ThreadLocal<Counter> counter(10);
92//
93// // Each thread will have access to it's own counter object.
94// Counter& cnt = counter.local();
95// cnt++;
96//
97// WARNING: Eigen::ThreadLocal uses the OS-specific value returned by
98// std::this_thread::get_id() to identify threads. This value is not guaranteed
99// to be unique except for the life of the thread. A newly created thread may
100// get an OS-specific ID equal to that of an already destroyed thread.
101//
102// Somewhat similar to TBB thread local storage, with similar restrictions:
103// https://www.threadingbuildingblocks.org/docs/help/reference/thread_local_storage/enumerable_thread_specific_cls.html
104//
105template <typename T, typename Initialize = internal::ThreadLocalNoOpInitialize<T>,
106 typename Release = internal::ThreadLocalNoOpRelease<T>>
107class ThreadLocal {
108 // We preallocate default constructed elements in MaxSizedVector.
109 static_assert(std::is_default_constructible<T>::value, "ThreadLocal data type must be default constructible");
110
111 public:
112 explicit ThreadLocal(int capacity)
113 : ThreadLocal(capacity, internal::ThreadLocalNoOpInitialize<T>(), internal::ThreadLocalNoOpRelease<T>()) {}
114
115 ThreadLocal(int capacity, Initialize initialize)
116 : ThreadLocal(capacity, std::move(initialize), internal::ThreadLocalNoOpRelease<T>()) {}
117
118 ThreadLocal(int capacity, Initialize initialize, Release release)
119 : initialize_(std::move(initialize)),
120 release_(std::move(release)),
121 capacity_(capacity),
122 data_(capacity_),
123 ptr_(capacity_),
124 filled_records_(0) {
125 eigen_assert(capacity_ >= 0);
126 data_.resize(capacity_);
127 for (int i = 0; i < capacity_; ++i) {
128 ptr_.emplace_back(nullptr);
129 }
130 }
131
132 T& local() {
133 std::thread::id this_thread = std::this_thread::get_id();
134 if (capacity_ == 0) return SpilledLocal(this_thread);
135
136 std::size_t h = std::hash<std::thread::id>()(this_thread);
137 const int start_idx = h % capacity_;
138
139 // NOTE: From the definition of `std::this_thread::get_id()` it is
140 // guaranteed that we never can have concurrent insertions with the same key
141 // to our hash-map like data structure. If we didn't find an element during
142 // the initial traversal, it's guaranteed that no one else could have
143 // inserted it while we are in this function. This allows to massively
144 // simplify out lock-free insert-only hash map.
145
146 // Check if we already have an element for `this_thread`.
147 int idx = start_idx;
148 while (ptr_[idx].load() != nullptr) {
149 ThreadIdAndValue& record = *(ptr_[idx].load());
150 if (record.thread_id == this_thread) return record.value;
151
152 idx += 1;
153 if (idx >= capacity_) idx -= capacity_;
154 if (idx == start_idx) break;
155 }
156
157 // If we are here, it means that we found an insertion point in lookup
158 // table at `idx`, or we did a full traversal and table is full.
159
160 // If lock-free storage is full, fallback on mutex.
161 if (filled_records_.load() >= capacity_) return SpilledLocal(this_thread);
162
163 // We double check that we still have space to insert an element into a lock
164 // free storage. If old value in `filled_records_` is larger than the
165 // records capacity, it means that some other thread added an element while
166 // we were traversing lookup table.
167 int insertion_index = filled_records_.fetch_add(1, std::memory_order_relaxed);
168 if (insertion_index >= capacity_) return SpilledLocal(this_thread);
169
170 // At this point it's guaranteed that we can access to
171 // data_[insertion_index_] without a data race.
172 data_[insertion_index].thread_id = this_thread;
173 initialize_(data_[insertion_index].value);
174
175 // That's the pointer we'll put into the lookup table.
176 ThreadIdAndValue* inserted = &data_[insertion_index];
177
178 // We'll use nullptr pointer to ThreadIdAndValue in a compare-and-swap loop.
179 ThreadIdAndValue* empty = nullptr;
180
181 // Now we have to find an insertion point into the lookup table. We start
182 // from the `idx` that was identified as an insertion point above, it's
183 // guaranteed that we will have an empty record somewhere in a lookup table
184 // (because we created a record in the `data_`).
185 const int insertion_idx = idx;
186
187 do {
188 // Always start search from the original insertion candidate.
189 idx = insertion_idx;
190 while (ptr_[idx].load() != nullptr) {
191 idx += 1;
192 if (idx >= capacity_) idx -= capacity_;
193 // If we did a full loop, it means that we don't have any free entries
194 // in the lookup table, and this means that something is terribly wrong.
195 eigen_assert(idx != insertion_idx);
196 }
197 // Atomic CAS of the pointer guarantees that any other thread, that will
198 // follow this pointer will see all the mutations in the `data_`.
199 } while (!ptr_[idx].compare_exchange_weak(empty, inserted));
200
201 return inserted->value;
202 }
203
204 // WARN: It's not thread safe to call it concurrently with `local()`.
205 void ForEach(std::function<void(std::thread::id, T&)> f) {
206 // Reading directly from `data_` is unsafe, because only CAS to the
207 // record in `ptr_` makes all changes visible to other threads.
208 for (auto& ptr : ptr_) {
209 ThreadIdAndValue* record = ptr.load();
210 if (record == nullptr) continue;
211 f(record->thread_id, record->value);
212 }
213
214 // We did not spill into the map based storage.
215 if (filled_records_.load(std::memory_order_relaxed) < capacity_) return;
216
217 // Adds a happens before edge from the last call to SpilledLocal().
218 EIGEN_MUTEX_LOCK lock(mu_);
219 for (auto& kv : per_thread_map_) {
220 f(kv.first, kv.second);
221 }
222 }
223
224 // WARN: It's not thread safe to call it concurrently with `local()`.
225 ~ThreadLocal() {
226 // Reading directly from `data_` is unsafe, because only CAS to the record
227 // in `ptr_` makes all changes visible to other threads.
228 for (auto& ptr : ptr_) {
229 ThreadIdAndValue* record = ptr.load();
230 if (record == nullptr) continue;
231 release_(record->value);
232 }
233
234 // We did not spill into the map based storage.
235 if (filled_records_.load(std::memory_order_relaxed) < capacity_) return;
236
237 // Adds a happens before edge from the last call to SpilledLocal().
238 EIGEN_MUTEX_LOCK lock(mu_);
239 for (auto& kv : per_thread_map_) {
240 release_(kv.second);
241 }
242 }
243
244 private:
245 struct ThreadIdAndValue {
246 std::thread::id thread_id;
247 T value;
248 };
249
250 // Use unordered map guarded by a mutex when lock free storage is full.
251 T& SpilledLocal(std::thread::id this_thread) {
252 EIGEN_MUTEX_LOCK lock(mu_);
253
254 auto it = per_thread_map_.find(this_thread);
255 if (it == per_thread_map_.end()) {
256 auto result = per_thread_map_.emplace(this_thread, T());
257 eigen_assert(result.second);
258 initialize_((*result.first).second);
259 return (*result.first).second;
260 } else {
261 return it->second;
262 }
263 }
264
265 Initialize initialize_;
266 Release release_;
267 const int capacity_;
268
269 // Storage that backs lock-free lookup table `ptr_`. Records stored in this
270 // storage contiguously starting from index 0.
271 MaxSizeVector<ThreadIdAndValue> data_;
272
273 // Atomic pointers to the data stored in `data_`. Used as a lookup table for
274 // linear probing hash map (https://en.wikipedia.org/wiki/Linear_probing).
275 MaxSizeVector<std::atomic<ThreadIdAndValue*>> ptr_;
276
277 // Number of records stored in the `data_`.
278 std::atomic<int> filled_records_;
279
280 // We fallback on per thread map if lock-free storage is full. In practice
281 // this should never happen, if `capacity_` is a reasonable estimate of the
282 // number of threads running in a system.
283 EIGEN_MUTEX mu_; // Protects per_thread_map_.
284 std::unordered_map<std::thread::id, T> per_thread_map_;
285};
286
287} // namespace Eigen
288
289#endif // EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
Namespace containing all symbols from the Eigen library.
Definition B01_Experimental.dox:1