GCC Code Coverage Report


Directory: ./
File: openvdb/openvdb/io/Queue.cc
Date: 2022-07-25 17:40:05
Exec Total Coverage
Lines: 71 89 79.8%
Functions: 14 21 66.7%
Branches: 45 98 45.9%

Line Branch Exec Source
1 // Copyright Contributors to the OpenVDB Project
2 // SPDX-License-Identifier: MPL-2.0
3
4 /// @file Queue.cc
5 /// @author Peter Cucka
6
7 #include "Queue.h"
8 #include "File.h"
9 #include "Stream.h"
10 #include "openvdb/Exceptions.h"
11 #include "openvdb/util/logging.h"
12
13 #include <tbb/concurrent_hash_map.h>
14 #include <tbb/task_arena.h>
15
16 #include <thread>
17 #include <algorithm> // for std::max()
18 #include <atomic>
19 #include <iostream>
20 #include <map>
21 #include <mutex>
22 #include <chrono>
23
24
25 namespace openvdb {
26 OPENVDB_USE_VERSION_NAMESPACE
27 namespace OPENVDB_VERSION_NAME {
28 namespace io {
29
30 namespace {
31
32 // Abstract base class for queuable TBB tasks that adds a task completion callback
33 class Task
34 {
35 public:
36 20 Task(Queue::Id id): mId(id) {}
37
2/4
✓ Branch 0 taken 57 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
58 virtual ~Task() {}
38
39 38 Queue::Id id() const { return mId; }
40
41 19 void setNotifier(Queue::Notifier& notifier) { mNotify = notifier; }
42 virtual void execute() const = 0;
43
44 protected:
45
1/2
✓ Branch 0 taken 19 times.
✗ Branch 1 not taken.
19 void notify(Queue::Status status) const { if (mNotify) mNotify(this->id(), status); }
46
47 private:
48 Queue::Id mId;
49 Queue::Notifier mNotify;
50 };
51
52
53 // Queuable TBB task that writes one or more grids to a .vdb file or an output stream
54 class OutputTask : public Task
55 {
56 public:
57 20 OutputTask(Queue::Id id, const GridCPtrVec& grids, const Archive& archive,
58 const MetaMap& metadata)
59
1/2
✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
20 : Task(id)
60 , mGrids(grids)
61 20 , mArchive(archive.copy())
62
3/6
✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 20 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 20 times.
✗ Branch 8 not taken.
20 , mMetadata(metadata) {}
63 232 ~OutputTask() override {}
64
65
1/2
✓ Branch 1 taken 19 times.
✗ Branch 2 not taken.
19 void execute() const override
66 {
67 Queue::Status status = Queue::FAILED;
68 try {
69
1/2
✓ Branch 1 taken 19 times.
✗ Branch 2 not taken.
19 mArchive->write(mGrids, mMetadata);
70 status = Queue::SUCCEEDED;
71 } catch (std::exception& e) {
72 if (const char* msg = e.what()) {
73 OPENVDB_LOG_ERROR(msg);
74 }
75 } catch (...) {}
76 this->notify(status);
77 19 }
78
79 private:
80 GridCPtrVec mGrids;
81 SharedPtr<Archive> mArchive;
82 MetaMap mMetadata;
83 };
84
85 } // unnamed namespace
86
87
88 ////////////////////////////////////////
89
90
91 // Private implementation details of a Queue
92 struct Queue::Impl
93 {
94 using NotifierMap = std::map<Queue::Id, Queue::Notifier>;
95 /// @todo Provide more information than just "succeeded" or "failed"?
96 using StatusMap = tbb::concurrent_hash_map<Queue::Id, Queue::Status>;
97
98 3 Impl()
99 3 : mTimeout(Queue::DEFAULT_TIMEOUT)
100 , mCapacity(Queue::DEFAULT_CAPACITY)
101 , mNextId(1)
102 3 , mNextNotifierId(1)
103 {
104 mNumTasks = 0; // note: must explicitly zero-initialize atomics
105 3 }
106 3 ~Impl() {}
107
108 // Disallow copying of instances of this class.
109 Impl(const Impl&);
110 Impl& operator=(const Impl&);
111
112 // This method might be called from multiple threads.
113
1/2
✓ Branch 1 taken 38 times.
✗ Branch 2 not taken.
38 void setStatus(Queue::Id id, Queue::Status status)
114 {
115 StatusMap::accessor acc;
116
1/2
✓ Branch 1 taken 38 times.
✗ Branch 2 not taken.
38 mStatus.insert(acc, id);
117
1/2
✓ Branch 0 taken 38 times.
✗ Branch 1 not taken.
38 acc->second = status;
118 38 }
119
120 // This method might be called from multiple threads.
121 19 void setStatusWithNotification(Queue::Id id, Queue::Status status)
122 {
123 19 const bool completed = (status == SUCCEEDED || status == FAILED);
124
125 // Update the task's entry in the status map with the new status.
126 19 this->setStatus(id, status);
127
128 // If the client registered any callbacks, call them now.
129 bool didNotify = false;
130 {
131 // tbb::concurrent_hash_map does not support concurrent iteration
132 // (i.e., iteration concurrent with insertion or deletion),
133 // so we use a mutex-protected STL map instead. But if a callback
134 // invokes a notifier method such as removeNotifier() on this queue,
135 // the result will be a deadlock.
136 /// @todo Is it worth trying to avoid such deadlocks?
137 19 std::lock_guard<std::mutex> lock(mNotifierMutex);
138
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 10 times.
19 if (!mNotifiers.empty()) {
139 didNotify = true;
140 for (NotifierMap::const_iterator it = mNotifiers.begin();
141
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 9 times.
18 it != mNotifiers.end(); ++it)
142 {
143
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 it->second(id, status);
144 }
145 }
146 }
147 // If the task completed and callbacks were called, remove
148 // the task's entry from the status map.
149
1/2
✓ Branch 0 taken 19 times.
✗ Branch 1 not taken.
19 if (completed) {
150
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 10 times.
19 if (didNotify) {
151 StatusMap::accessor acc;
152
2/4
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 9 times.
✗ Branch 4 not taken.
9 if (mStatus.find(acc, id)) {
153 mStatus.erase(acc);
154 }
155 }
156 --mNumTasks;
157 }
158 19 }
159
160 24 bool canEnqueue() const { return mNumTasks < Int64(mCapacity); }
161
162 20 void enqueue(OutputTask& task)
163 {
164 20 auto start = std::chrono::steady_clock::now();
165
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 19 times.
24 while (!canEnqueue()) {
166 5 std::this_thread::sleep_for(/*0.5s*/std::chrono::milliseconds(500));
167 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
168 5 std::chrono::steady_clock::now() - start);
169 5 const double seconds = double(duration.count()) / 1000.0;
170
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 times.
5 if (seconds > double(mTimeout)) {
171
3/8
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 1 times.
✗ Branch 8 not taken.
✗ Branch 15 not taken.
✗ Branch 16 not taken.
5 OPENVDB_THROW(RuntimeError,
172 "unable to queue I/O task; " << mTimeout << "-second time limit expired");
173 }
174 }
175
1/2
✓ Branch 2 taken 19 times.
✗ Branch 3 not taken.
19 Queue::Notifier notify = std::bind(&Impl::setStatusWithNotification, this,
176 19 std::placeholders::_1, std::placeholders::_2);
177 task.setNotifier(notify);
178
1/2
✓ Branch 1 taken 19 times.
✗ Branch 2 not taken.
19 this->setStatus(task.id(), Queue::PENDING);
179
180 // get the global task arena
181 tbb::task_arena arena(tbb::task_arena::attach{});
182
3/8
✓ Branch 2 taken 19 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 19 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 19 times.
✗ Branch 9 not taken.
✗ Branch 11 not taken.
✗ Branch 12 not taken.
76 arena.enqueue([task = std::move(task)] { task.execute(); });
183 ++mNumTasks;
184 19 }
185
186 Index32 mTimeout;
187 Index32 mCapacity;
188 std::atomic<Int32> mNumTasks;
189 Index32 mNextId;
190 StatusMap mStatus;
191 NotifierMap mNotifiers;
192 Index32 mNextNotifierId;
193 std::mutex mNotifierMutex;
194 };
195
196
197 ////////////////////////////////////////
198
199
200 3 Queue::Queue(Index32 capacity): mImpl(new Impl)
201 {
202 3 mImpl->mCapacity = capacity;
203 3 }
204
205
206 3 Queue::~Queue()
207 {
208 // Wait for all queued tasks to complete (successfully or unsuccessfully).
209 /// @todo Allow the queue to be destroyed while there are uncompleted tasks
210 /// (e.g., by keeping a static registry of queues that also dispatches
211 /// or blocks notifications)?
212
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 while (mImpl->mNumTasks > 0) {
213 std::this_thread::sleep_for(/*0.5s*/std::chrono::milliseconds(500));
214 }
215 3 }
216
217
218 ////////////////////////////////////////
219
220
221 4 bool Queue::empty() const { return (mImpl->mNumTasks == 0); }
222 Index32 Queue::size() const { return Index32(std::max<Int32>(0, mImpl->mNumTasks)); }
223 Index32 Queue::capacity() const { return mImpl->mCapacity; }
224 void Queue::setCapacity(Index32 n) { mImpl->mCapacity = std::max<Index32>(1, n); }
225
226 /// @todo void Queue::setCapacity(Index64 bytes);
227
228 /// @todo Provide a way to limit the number of tasks in flight
229 /// (e.g., by enqueueing tbb::tasks that pop Tasks off a concurrent_queue)?
230
231 /// @todo Remove any tasks from the queue that are not currently executing.
232 //void clear() const;
233
234 Index32 Queue::timeout() const { return mImpl->mTimeout; }
235 1 void Queue::setTimeout(Index32 sec) { mImpl->mTimeout = sec; }
236
237
238 ////////////////////////////////////////
239
240
241 Queue::Status
242
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 Queue::status(Id id) const
243 {
244 Impl::StatusMap::const_accessor acc;
245
2/4
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 9 times.
✗ Branch 4 not taken.
9 if (mImpl->mStatus.find(acc, id)) {
246 9 const Status status = acc->second;
247
1/2
✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
9 if (status == SUCCEEDED || status == FAILED) {
248
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 mImpl->mStatus.erase(acc);
249 }
250 9 return status;
251 }
252 return UNKNOWN;
253 }
254
255
256 Queue::Id
257 1 Queue::addNotifier(Notifier notify)
258 {
259 1 std::lock_guard<std::mutex> lock(mImpl->mNotifierMutex);
260 1 Queue::Id id = mImpl->mNextNotifierId++;
261
2/4
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
1 mImpl->mNotifiers[id] = notify;
262
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
2 return id;
263 }
264
265
266 void
267 Queue::removeNotifier(Id id)
268 {
269 std::lock_guard<std::mutex> lock(mImpl->mNotifierMutex);
270 Impl::NotifierMap::iterator it = mImpl->mNotifiers.find(id);
271 if (it != mImpl->mNotifiers.end()) {
272 mImpl->mNotifiers.erase(it);
273 }
274 }
275
276
277 void
278 Queue::clearNotifiers()
279 {
280 std::lock_guard<std::mutex> lock(mImpl->mNotifierMutex);
281 mImpl->mNotifiers.clear();
282 }
283
284
285 ////////////////////////////////////////
286
287
288 Queue::Id
289 Queue::writeGrid(GridBase::ConstPtr grid, const Archive& archive, const MetaMap& metadata)
290 {
291 return writeGridVec(GridCPtrVec(1, grid), archive, metadata);
292 }
293
294
295 Queue::Id
296 20 Queue::writeGridVec(const GridCPtrVec& grids, const Archive& archive, const MetaMap& metadata)
297 {
298 20 const Queue::Id taskId = mImpl->mNextId++;
299 40 OutputTask task(taskId, grids, archive, metadata);
300
2/2
✓ Branch 1 taken 19 times.
✓ Branch 2 taken 1 times.
20 mImpl->enqueue(task);
301 19 return taskId;
302 }
303
304 } // namespace io
305 } // namespace OPENVDB_VERSION_NAME
306 } // namespace openvdb
307