| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | // Copyright Contributors to the OpenVDB Project | ||
| 2 | // SPDX-License-Identifier: MPL-2.0 | ||
| 3 | |||
| 4 | /// @file Queue.cc | ||
| 5 | /// @author Peter Cucka | ||
| 6 | |||
| 7 | #include "Queue.h" | ||
| 8 | #include "File.h" | ||
| 9 | #include "Stream.h" | ||
| 10 | #include "openvdb/Exceptions.h" | ||
| 11 | #include "openvdb/util/logging.h" | ||
| 12 | |||
| 13 | #include <tbb/concurrent_hash_map.h> | ||
| 14 | #include <tbb/task_arena.h> | ||
| 15 | |||
| 16 | #include <thread> | ||
| 17 | #include <algorithm> // for std::max() | ||
| 18 | #include <atomic> | ||
| 19 | #include <iostream> | ||
| 20 | #include <map> | ||
| 21 | #include <mutex> | ||
| 22 | #include <chrono> | ||
| 23 | |||
| 24 | |||
| 25 | namespace openvdb { | ||
| 26 | OPENVDB_USE_VERSION_NAMESPACE | ||
| 27 | namespace OPENVDB_VERSION_NAME { | ||
| 28 | namespace io { | ||
| 29 | |||
| 30 | namespace { | ||
| 31 | |||
| 32 | // Abstract base class for queuable TBB tasks that adds a task completion callback | ||
| 33 | class Task | ||
| 34 | { | ||
| 35 | public: | ||
| 36 | 20 | Task(Queue::Id id): mId(id) {} | |
| 37 |
2/4✓ Branch 0 taken 57 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
58 | virtual ~Task() {} |
| 38 | |||
| 39 | 38 | Queue::Id id() const { return mId; } | |
| 40 | |||
| 41 | 19 | void setNotifier(Queue::Notifier& notifier) { mNotify = notifier; } | |
| 42 | virtual void execute() const = 0; | ||
| 43 | |||
| 44 | protected: | ||
| 45 |
1/2✓ Branch 0 taken 19 times.
✗ Branch 1 not taken.
|
19 | void notify(Queue::Status status) const { if (mNotify) mNotify(this->id(), status); } |
| 46 | |||
| 47 | private: | ||
| 48 | Queue::Id mId; | ||
| 49 | Queue::Notifier mNotify; | ||
| 50 | }; | ||
| 51 | |||
| 52 | |||
| 53 | // Queuable TBB task that writes one or more grids to a .vdb file or an output stream | ||
| 54 | class OutputTask : public Task | ||
| 55 | { | ||
| 56 | public: | ||
| 57 | 20 | OutputTask(Queue::Id id, const GridCPtrVec& grids, const Archive& archive, | |
| 58 | const MetaMap& metadata) | ||
| 59 |
1/2✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
|
20 | : Task(id) |
| 60 | , mGrids(grids) | ||
| 61 | 20 | , mArchive(archive.copy()) | |
| 62 |
3/6✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 20 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 20 times.
✗ Branch 8 not taken.
|
20 | , mMetadata(metadata) {} |
| 63 | 232 | ~OutputTask() override {} | |
| 64 | |||
| 65 |
1/2✓ Branch 1 taken 19 times.
✗ Branch 2 not taken.
|
19 | void execute() const override |
| 66 | { | ||
| 67 | Queue::Status status = Queue::FAILED; | ||
| 68 | try { | ||
| 69 |
1/2✓ Branch 1 taken 19 times.
✗ Branch 2 not taken.
|
19 | mArchive->write(mGrids, mMetadata); |
| 70 | status = Queue::SUCCEEDED; | ||
| 71 | ✗ | } catch (std::exception& e) { | |
| 72 | ✗ | if (const char* msg = e.what()) { | |
| 73 | ✗ | OPENVDB_LOG_ERROR(msg); | |
| 74 | } | ||
| 75 | ✗ | } catch (...) {} | |
| 76 | this->notify(status); | ||
| 77 | 19 | } | |
| 78 | |||
| 79 | private: | ||
| 80 | GridCPtrVec mGrids; | ||
| 81 | SharedPtr<Archive> mArchive; | ||
| 82 | MetaMap mMetadata; | ||
| 83 | }; | ||
| 84 | |||
| 85 | } // unnamed namespace | ||
| 86 | |||
| 87 | |||
| 88 | //////////////////////////////////////// | ||
| 89 | |||
| 90 | |||
| 91 | // Private implementation details of a Queue | ||
| 92 | struct Queue::Impl | ||
| 93 | { | ||
| 94 | using NotifierMap = std::map<Queue::Id, Queue::Notifier>; | ||
| 95 | /// @todo Provide more information than just "succeeded" or "failed"? | ||
| 96 | using StatusMap = tbb::concurrent_hash_map<Queue::Id, Queue::Status>; | ||
| 97 | |||
| 98 | 3 | Impl() | |
| 99 | 3 | : mTimeout(Queue::DEFAULT_TIMEOUT) | |
| 100 | , mCapacity(Queue::DEFAULT_CAPACITY) | ||
| 101 | , mNextId(1) | ||
| 102 | 3 | , mNextNotifierId(1) | |
| 103 | { | ||
| 104 | mNumTasks = 0; // note: must explicitly zero-initialize atomics | ||
| 105 | 3 | } | |
| 106 | 3 | ~Impl() {} | |
| 107 | |||
| 108 | // Disallow copying of instances of this class. | ||
| 109 | Impl(const Impl&); | ||
| 110 | Impl& operator=(const Impl&); | ||
| 111 | |||
| 112 | // This method might be called from multiple threads. | ||
| 113 |
1/2✓ Branch 1 taken 38 times.
✗ Branch 2 not taken.
|
38 | void setStatus(Queue::Id id, Queue::Status status) |
| 114 | { | ||
| 115 | StatusMap::accessor acc; | ||
| 116 |
1/2✓ Branch 1 taken 38 times.
✗ Branch 2 not taken.
|
38 | mStatus.insert(acc, id); |
| 117 |
1/2✓ Branch 0 taken 38 times.
✗ Branch 1 not taken.
|
38 | acc->second = status; |
| 118 | 38 | } | |
| 119 | |||
| 120 | // This method might be called from multiple threads. | ||
| 121 | 19 | void setStatusWithNotification(Queue::Id id, Queue::Status status) | |
| 122 | { | ||
| 123 | 19 | const bool completed = (status == SUCCEEDED || status == FAILED); | |
| 124 | |||
| 125 | // Update the task's entry in the status map with the new status. | ||
| 126 | 19 | this->setStatus(id, status); | |
| 127 | |||
| 128 | // If the client registered any callbacks, call them now. | ||
| 129 | bool didNotify = false; | ||
| 130 | { | ||
| 131 | // tbb::concurrent_hash_map does not support concurrent iteration | ||
| 132 | // (i.e., iteration concurrent with insertion or deletion), | ||
| 133 | // so we use a mutex-protected STL map instead. But if a callback | ||
| 134 | // invokes a notifier method such as removeNotifier() on this queue, | ||
| 135 | // the result will be a deadlock. | ||
| 136 | /// @todo Is it worth trying to avoid such deadlocks? | ||
| 137 | 19 | std::lock_guard<std::mutex> lock(mNotifierMutex); | |
| 138 |
2/2✓ Branch 0 taken 9 times.
✓ Branch 1 taken 10 times.
|
19 | if (!mNotifiers.empty()) { |
| 139 | didNotify = true; | ||
| 140 | for (NotifierMap::const_iterator it = mNotifiers.begin(); | ||
| 141 |
2/2✓ Branch 0 taken 9 times.
✓ Branch 1 taken 9 times.
|
18 | it != mNotifiers.end(); ++it) |
| 142 | { | ||
| 143 |
1/2✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
|
9 | it->second(id, status); |
| 144 | } | ||
| 145 | } | ||
| 146 | } | ||
| 147 | // If the task completed and callbacks were called, remove | ||
| 148 | // the task's entry from the status map. | ||
| 149 |
1/2✓ Branch 0 taken 19 times.
✗ Branch 1 not taken.
|
19 | if (completed) { |
| 150 |
2/2✓ Branch 0 taken 9 times.
✓ Branch 1 taken 10 times.
|
19 | if (didNotify) { |
| 151 | StatusMap::accessor acc; | ||
| 152 |
2/4✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 9 times.
✗ Branch 4 not taken.
|
9 | if (mStatus.find(acc, id)) { |
| 153 | mStatus.erase(acc); | ||
| 154 | } | ||
| 155 | } | ||
| 156 | --mNumTasks; | ||
| 157 | } | ||
| 158 | 19 | } | |
| 159 | |||
| 160 | 24 | bool canEnqueue() const { return mNumTasks < Int64(mCapacity); } | |
| 161 | |||
| 162 | 20 | void enqueue(OutputTask& task) | |
| 163 | { | ||
| 164 | 20 | auto start = std::chrono::steady_clock::now(); | |
| 165 |
2/2✓ Branch 0 taken 5 times.
✓ Branch 1 taken 19 times.
|
24 | while (!canEnqueue()) { |
| 166 | 5 | std::this_thread::sleep_for(/*0.5s*/std::chrono::milliseconds(500)); | |
| 167 | auto duration = std::chrono::duration_cast<std::chrono::milliseconds>( | ||
| 168 | 5 | std::chrono::steady_clock::now() - start); | |
| 169 | 5 | const double seconds = double(duration.count()) / 1000.0; | |
| 170 |
2/2✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 times.
|
5 | if (seconds > double(mTimeout)) { |
| 171 |
3/8✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 1 times.
✗ Branch 8 not taken.
✗ Branch 15 not taken.
✗ Branch 16 not taken.
|
5 | OPENVDB_THROW(RuntimeError, |
| 172 | "unable to queue I/O task; " << mTimeout << "-second time limit expired"); | ||
| 173 | } | ||
| 174 | } | ||
| 175 |
1/2✓ Branch 2 taken 19 times.
✗ Branch 3 not taken.
|
19 | Queue::Notifier notify = std::bind(&Impl::setStatusWithNotification, this, |
| 176 | 19 | std::placeholders::_1, std::placeholders::_2); | |
| 177 | task.setNotifier(notify); | ||
| 178 |
1/2✓ Branch 1 taken 19 times.
✗ Branch 2 not taken.
|
19 | this->setStatus(task.id(), Queue::PENDING); |
| 179 | |||
| 180 | // get the global task arena | ||
| 181 | tbb::task_arena arena(tbb::task_arena::attach{}); | ||
| 182 |
3/8✓ Branch 2 taken 19 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 19 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 19 times.
✗ Branch 9 not taken.
✗ Branch 11 not taken.
✗ Branch 12 not taken.
|
76 | arena.enqueue([task = std::move(task)] { task.execute(); }); |
| 183 | ++mNumTasks; | ||
| 184 | 19 | } | |
| 185 | |||
| 186 | Index32 mTimeout; | ||
| 187 | Index32 mCapacity; | ||
| 188 | std::atomic<Int32> mNumTasks; | ||
| 189 | Index32 mNextId; | ||
| 190 | StatusMap mStatus; | ||
| 191 | NotifierMap mNotifiers; | ||
| 192 | Index32 mNextNotifierId; | ||
| 193 | std::mutex mNotifierMutex; | ||
| 194 | }; | ||
| 195 | |||
| 196 | |||
| 197 | //////////////////////////////////////// | ||
| 198 | |||
| 199 | |||
| 200 | 3 | Queue::Queue(Index32 capacity): mImpl(new Impl) | |
| 201 | { | ||
| 202 | 3 | mImpl->mCapacity = capacity; | |
| 203 | 3 | } | |
| 204 | |||
| 205 | |||
| 206 | 3 | Queue::~Queue() | |
| 207 | { | ||
| 208 | // Wait for all queued tasks to complete (successfully or unsuccessfully). | ||
| 209 | /// @todo Allow the queue to be destroyed while there are uncompleted tasks | ||
| 210 | /// (e.g., by keeping a static registry of queues that also dispatches | ||
| 211 | /// or blocks notifications)? | ||
| 212 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | while (mImpl->mNumTasks > 0) { |
| 213 | ✗ | std::this_thread::sleep_for(/*0.5s*/std::chrono::milliseconds(500)); | |
| 214 | } | ||
| 215 | 3 | } | |
| 216 | |||
| 217 | |||
| 218 | //////////////////////////////////////// | ||
| 219 | |||
| 220 | |||
| 221 | 4 | bool Queue::empty() const { return (mImpl->mNumTasks == 0); } | |
| 222 | ✗ | Index32 Queue::size() const { return Index32(std::max<Int32>(0, mImpl->mNumTasks)); } | |
| 223 | ✗ | Index32 Queue::capacity() const { return mImpl->mCapacity; } | |
| 224 | ✗ | void Queue::setCapacity(Index32 n) { mImpl->mCapacity = std::max<Index32>(1, n); } | |
| 225 | |||
| 226 | /// @todo void Queue::setCapacity(Index64 bytes); | ||
| 227 | |||
| 228 | /// @todo Provide a way to limit the number of tasks in flight | ||
| 229 | /// (e.g., by enqueueing tbb::tasks that pop Tasks off a concurrent_queue)? | ||
| 230 | |||
| 231 | /// @todo Remove any tasks from the queue that are not currently executing. | ||
| 232 | //void clear() const; | ||
| 233 | |||
| 234 | ✗ | Index32 Queue::timeout() const { return mImpl->mTimeout; } | |
| 235 | 1 | void Queue::setTimeout(Index32 sec) { mImpl->mTimeout = sec; } | |
| 236 | |||
| 237 | |||
| 238 | //////////////////////////////////////// | ||
| 239 | |||
| 240 | |||
| 241 | Queue::Status | ||
| 242 |
1/2✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
|
9 | Queue::status(Id id) const |
| 243 | { | ||
| 244 | Impl::StatusMap::const_accessor acc; | ||
| 245 |
2/4✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 9 times.
✗ Branch 4 not taken.
|
9 | if (mImpl->mStatus.find(acc, id)) { |
| 246 | 9 | const Status status = acc->second; | |
| 247 |
1/2✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
|
9 | if (status == SUCCEEDED || status == FAILED) { |
| 248 |
1/2✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
|
9 | mImpl->mStatus.erase(acc); |
| 249 | } | ||
| 250 | 9 | return status; | |
| 251 | } | ||
| 252 | return UNKNOWN; | ||
| 253 | } | ||
| 254 | |||
| 255 | |||
| 256 | Queue::Id | ||
| 257 | 1 | Queue::addNotifier(Notifier notify) | |
| 258 | { | ||
| 259 | 1 | std::lock_guard<std::mutex> lock(mImpl->mNotifierMutex); | |
| 260 | 1 | Queue::Id id = mImpl->mNextNotifierId++; | |
| 261 |
2/4✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
|
1 | mImpl->mNotifiers[id] = notify; |
| 262 |
1/2✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
|
2 | return id; |
| 263 | } | ||
| 264 | |||
| 265 | |||
| 266 | void | ||
| 267 | ✗ | Queue::removeNotifier(Id id) | |
| 268 | { | ||
| 269 | ✗ | std::lock_guard<std::mutex> lock(mImpl->mNotifierMutex); | |
| 270 | ✗ | Impl::NotifierMap::iterator it = mImpl->mNotifiers.find(id); | |
| 271 | ✗ | if (it != mImpl->mNotifiers.end()) { | |
| 272 | ✗ | mImpl->mNotifiers.erase(it); | |
| 273 | } | ||
| 274 | } | ||
| 275 | |||
| 276 | |||
| 277 | void | ||
| 278 | ✗ | Queue::clearNotifiers() | |
| 279 | { | ||
| 280 | ✗ | std::lock_guard<std::mutex> lock(mImpl->mNotifierMutex); | |
| 281 | mImpl->mNotifiers.clear(); | ||
| 282 | } | ||
| 283 | |||
| 284 | |||
| 285 | //////////////////////////////////////// | ||
| 286 | |||
| 287 | |||
| 288 | Queue::Id | ||
| 289 | ✗ | Queue::writeGrid(GridBase::ConstPtr grid, const Archive& archive, const MetaMap& metadata) | |
| 290 | { | ||
| 291 | ✗ | return writeGridVec(GridCPtrVec(1, grid), archive, metadata); | |
| 292 | } | ||
| 293 | |||
| 294 | |||
| 295 | Queue::Id | ||
| 296 | 20 | Queue::writeGridVec(const GridCPtrVec& grids, const Archive& archive, const MetaMap& metadata) | |
| 297 | { | ||
| 298 | 20 | const Queue::Id taskId = mImpl->mNextId++; | |
| 299 | 40 | OutputTask task(taskId, grids, archive, metadata); | |
| 300 |
2/2✓ Branch 1 taken 19 times.
✓ Branch 2 taken 1 times.
|
20 | mImpl->enqueue(task); |
| 301 | 19 | return taskId; | |
| 302 | } | ||
| 303 | |||
| 304 | } // namespace io | ||
| 305 | } // namespace OPENVDB_VERSION_NAME | ||
| 306 | } // namespace openvdb | ||
| 307 |