OpenVDB  11.0.0
StreamCompression.h
Go to the documentation of this file.
1 // Copyright Contributors to the OpenVDB Project
2 // SPDX-License-Identifier: MPL-2.0
3 
4 /// @file points/StreamCompression.h
5 ///
6 /// @author Dan Bailey
7 ///
8 /// @brief Convenience wrappers to using Blosc and reading and writing of Paged data.
9 ///
10 /// Blosc is most effective with large (> ~256KB) blocks of data. Writing the entire
11 /// data block contiguously would provide the most optimal compression, however would
12 /// limit the ability to use delayed-loading as the whole block would be required to
13 /// be loaded from disk at once. To balance these two competing factors, Paging is used
14 /// to write out blocks of data that are a reasonable size for Blosc. These Pages are
15 /// loaded lazily, tracking the input stream pointers and creating Handles that reference
16 /// portions of the buffer. When the Page buffer is accessed, the data will be read from
17 /// the stream.
18 
19 #ifndef OPENVDB_TOOLS_STREAM_COMPRESSION_HAS_BEEN_INCLUDED
20 #define OPENVDB_TOOLS_STREAM_COMPRESSION_HAS_BEEN_INCLUDED
21 
22 #include <openvdb/io/io.h>
23 #include <tbb/spin_mutex.h>
24 #include <memory>
25 #include <string>
26 
27 
28 class TestStreamCompression;
29 
30 namespace openvdb {
32 namespace OPENVDB_VERSION_NAME {
33 namespace compression {
34 
35 
36 // This is the minimum number of bytes below which Blosc compression is not used to
37 // avoid unecessary computation, as Blosc offers minimal compression until this limit
38 static const int BLOSC_MINIMUM_BYTES = 48;
39 
40 // This is the minimum number of bytes below which the array is padded with zeros up
41 // to this number of bytes to allow Blosc to perform compression with small arrays
42 static const int BLOSC_PAD_BYTES = 128;
43 
44 
45 /// @brief Returns true if compression is available
47 
48 /// @brief Retrieves the uncompressed size of buffer when uncompressed
49 ///
50 /// @param buffer the compressed buffer
51 OPENVDB_API size_t bloscUncompressedSize(const char* buffer);
52 
53 /// @brief Compress into the supplied buffer.
54 ///
55 /// @param compressedBuffer the buffer to compress
56 /// @param compressedBytes number of compressed bytes
57 /// @param bufferBytes the number of bytes in compressedBuffer available to be filled
58 /// @param uncompressedBuffer the uncompressed buffer to compress
59 /// @param uncompressedBytes number of uncompressed bytes
60 OPENVDB_API void bloscCompress(char* compressedBuffer, size_t& compressedBytes,
61  const size_t bufferBytes, const char* uncompressedBuffer, const size_t uncompressedBytes);
62 
63 /// @brief Compress and return the heap-allocated compressed buffer.
64 ///
65 /// @param buffer the buffer to compress
66 /// @param uncompressedBytes number of uncompressed bytes
67 /// @param compressedBytes number of compressed bytes (written to this variable)
68 /// @param resize the compressed buffer will be exactly resized to remove the
69 /// portion used for Blosc overhead, for efficiency this can be
70 /// skipped if it is known that the resulting buffer is temporary
71 OPENVDB_API std::unique_ptr<char[]> bloscCompress(const char* buffer,
72  const size_t uncompressedBytes, size_t& compressedBytes, const bool resize = true);
73 
74 /// @brief Convenience wrapper to retrieve the compressed size of buffer when compressed
75 ///
76 /// @param buffer the uncompressed buffer
77 /// @param uncompressedBytes number of uncompressed bytes
78 OPENVDB_API size_t bloscCompressedSize(const char* buffer, const size_t uncompressedBytes);
79 
80 /// @brief Decompress into the supplied buffer. Will throw if decompression fails or
81 /// uncompressed buffer has insufficient space in which to decompress.
82 ///
83 /// @param uncompressedBuffer the uncompressed buffer to decompress into
84 /// @param expectedBytes the number of bytes expected once the buffer is decompressed
85 /// @param bufferBytes the number of bytes in uncompressedBuffer available to be filled
86 /// @param compressedBuffer the compressed buffer to decompress
87 OPENVDB_API void bloscDecompress(char* uncompressedBuffer, const size_t expectedBytes,
88  const size_t bufferBytes, const char* compressedBuffer);
89 
90 /// @brief Decompress and return the the heap-allocated uncompressed buffer.
91 ///
92 /// @param buffer the buffer to decompress
93 /// @param expectedBytes the number of bytes expected once the buffer is decompressed
94 /// @param resize the compressed buffer will be exactly resized to remove the
95 /// portion used for Blosc overhead, for efficiency this can be
96 /// skipped if it is known that the resulting buffer is temporary
97 OPENVDB_API std::unique_ptr<char[]> bloscDecompress(const char* buffer,
98  const size_t expectedBytes, const bool resize = true);
99 
100 
101 ////////////////////////////////////////
102 
103 
104 // 1MB = 1048576 Bytes
105 static const int PageSize = 1024 * 1024;
106 
107 
108 /// @brief Stores a variable-size, compressed, delayed-load Page of data
109 /// that is loaded into memory when accessed. Access to the Page is
110 /// thread-safe as loading and decompressing the data is protected by a mutex.
112 {
113 private:
114  struct Info
115  {
116 #ifdef OPENVDB_USE_DELAYED_LOADING
117  io::MappedFile::Ptr mappedFile;
118 #endif
120  std::streamoff filepos;
121  long compressedBytes;
122  long uncompressedBytes;
123  }; // Info
124 
125 public:
126  using Ptr = std::shared_ptr<Page>;
127 
128  Page() = default;
129 
130  /// @brief load the Page into memory
131  void load() const;
132 
133  /// @brief Uncompressed bytes of the Paged data, available
134  /// when the header has been read.
135  long uncompressedBytes() const;
136 
137  /// @brief Retrieves a data pointer at the specific @param index
138  /// @note Will force a Page load when called.
139  const char* buffer(const int index) const;
140 
141  /// @brief Read the Page header
142  void readHeader(std::istream&);
143 
144  /// @brief Read the Page buffers. If @a delayed is true, stream
145  /// pointers will be stored to load the data lazily.
146  void readBuffers(std::istream&, bool delayed);
147 
148  /// @brief Test if the data is out-of-core
149  bool isOutOfCore() const;
150 
151 private:
152  /// @brief Convenience method to store a copy of the supplied buffer
153  void copy(const std::unique_ptr<char[]>& temp, int pageSize);
154 
155  /// @brief Decompress and store the supplied data
156  void decompress(const std::unique_ptr<char[]>& temp);
157 
158  /// @brief Thread-safe loading of the data
159  void doLoad() const;
160 
161  std::unique_ptr<Info> mInfo = std::unique_ptr<Info>(new Info);
162  std::unique_ptr<char[]> mData;
163  tbb::spin_mutex mMutex;
164 }; // class Page
165 
166 
167 /// @brief A PageHandle holds a unique ptr to a Page and a specific stream
168 /// pointer to a point within the decompressed Page buffer
170 {
171 public:
172  using Ptr = std::unique_ptr<PageHandle>;
173 
174  /// @brief Create the page handle
175  /// @param page a shared ptr to the page that stores the buffer
176  /// @param index start position of the buffer to be read
177  /// @param size total size of the buffer to be read in bytes
178  PageHandle(const Page::Ptr& page, const int index, const int size);
179 
180  /// @brief Retrieve a reference to the stored page
181  Page& page();
182 
183  /// @brief Return the size of the buffer
184  int size() const { return mSize; }
185 
186  /// @brief Read and return the buffer, loading and decompressing
187  /// the Page if necessary.
188  std::unique_ptr<char[]> read();
189 
190  /// @brief Return a copy of this PageHandle
191  Ptr copy() { return Ptr(new PageHandle(mPage, mIndex, mSize)); }
192 
193 protected:
194  friend class ::TestStreamCompression;
195 
196 private:
197  Page::Ptr mPage;
198  int mIndex = -1;
199  int mSize = 0;
200 }; // class PageHandle
201 
202 
203 /// @brief A Paging wrapper to std::istream that is responsible for reading
204 /// from a given input stream and creating Page objects and PageHandles that
205 /// reference those pages for delayed reading.
207 {
208 public:
209  using Ptr = std::shared_ptr<PagedInputStream>;
210 
211  PagedInputStream() = default;
212 
213  explicit PagedInputStream(std::istream& is);
214 
215  /// @brief Size-only mode tags the stream as only reading size data.
216  void setSizeOnly(bool sizeOnly) { mSizeOnly = sizeOnly; }
217  bool sizeOnly() const { return mSizeOnly; }
218 
219  // @brief Set and get the input stream
220  std::istream& getInputStream() { assert(mIs); return *mIs; }
221  void setInputStream(std::istream& is) { mIs = &is; }
222 
223  /// @brief Creates a PageHandle to access the next @param n bytes of the Page.
224  PageHandle::Ptr createHandle(std::streamsize n);
225 
226  /// @brief Takes a @a pageHandle and updates the referenced page with the
227  /// current stream pointer position and if @a delayed is false performs
228  /// an immediate read of the data.
229  void read(PageHandle::Ptr& pageHandle, std::streamsize n, bool delayed = true);
230 
231 private:
232  int mByteIndex = 0;
233  int mUncompressedBytes = 0;
234  std::istream* mIs = nullptr;
235  Page::Ptr mPage;
236  bool mSizeOnly = false;
237 }; // class PagedInputStream
238 
239 
240 /// @brief A Paging wrapper to std::ostream that is responsible for writing
241 /// from a given output stream at intervals set by the PageSize. As Pages are
242 /// variable in size, they are flushed to disk as soon as sufficiently large.
244 {
245 public:
246  using Ptr = std::shared_ptr<PagedOutputStream>;
247 
249 
250  explicit PagedOutputStream(std::ostream& os);
251 
252  /// @brief Size-only mode tags the stream as only writing size data.
253  void setSizeOnly(bool sizeOnly) { mSizeOnly = sizeOnly; }
254  bool sizeOnly() const { return mSizeOnly; }
255 
256  /// @brief Set and get the output stream
257  std::ostream& getOutputStream() { assert(mOs); return *mOs; }
258  void setOutputStream(std::ostream& os) { mOs = &os; }
259 
260  /// @brief Writes the given @param str buffer of size @param n
261  PagedOutputStream& write(const char* str, std::streamsize n);
262 
263  /// @brief Manually flushes the current page to disk if non-zero
264  void flush();
265 
266 private:
267  /// @brief Compress the @param buffer of @param size bytes and write
268  /// out to the stream.
269  void compressAndWrite(const char* buffer, size_t size);
270 
271  /// @brief Resize the internal page buffer to @param size bytes
272  void resize(size_t size);
273 
274  std::unique_ptr<char[]> mData = std::unique_ptr<char[]>(new char[PageSize]);
275  std::unique_ptr<char[]> mCompressedData = nullptr;
276  size_t mCapacity = PageSize;
277  int mBytes = 0;
278  std::ostream* mOs = nullptr;
279  bool mSizeOnly = false;
280 }; // class PagedOutputStream
281 
282 
283 } // namespace compression
284 } // namespace OPENVDB_VERSION_NAME
285 } // namespace openvdb
286 
287 #endif // OPENVDB_TOOLS_STREAM_COMPRESSION_HAS_BEEN_INCLUDED
#define OPENVDB_API
Definition: Platform.h:274
void setInputStream(std::istream &is)
Definition: StreamCompression.h:221
Stores a variable-size, compressed, delayed-load Page of data that is loaded into memory when accesse...
Definition: StreamCompression.h:111
OPENVDB_API std::unique_ptr< char[]> bloscCompress(const char *buffer, const size_t uncompressedBytes, size_t &compressedBytes, const bool resize=true)
Compress and return the heap-allocated compressed buffer.
static fileSize_t write(std::ostream &os, const GridHandle< BufferT > &handle, Codec codec, uint32_t n)
std::shared_ptr< T > SharedPtr
Definition: Types.h:114
std::unique_ptr< PageHandle > Ptr
Definition: StreamCompression.h:172
A Paging wrapper to std::ostream that is responsible for writing from a given output stream at interv...
Definition: StreamCompression.h:243
bool sizeOnly() const
Definition: StreamCompression.h:254
std::ostream & getOutputStream()
Set and get the output stream.
Definition: StreamCompression.h:257
static const int BLOSC_MINIMUM_BYTES
Definition: StreamCompression.h:38
std::shared_ptr< PagedOutputStream > Ptr
Definition: StreamCompression.h:246
Definition: Exceptions.h:13
OPENVDB_API bool bloscCanCompress()
Returns true if compression is available.
std::shared_ptr< PagedInputStream > Ptr
Definition: StreamCompression.h:209
OPENVDB_API size_t bloscCompressedSize(const char *buffer, const size_t uncompressedBytes)
Convenience wrapper to retrieve the compressed size of buffer when compressed.
static const int BLOSC_PAD_BYTES
Definition: StreamCompression.h:42
void setSizeOnly(bool sizeOnly)
Size-only mode tags the stream as only reading size data.
Definition: StreamCompression.h:216
static void read(std::istream &is, BufferT &buffer, Codec codec)
A PageHandle holds a unique ptr to a Page and a specific stream pointer to a point within the decompr...
Definition: StreamCompression.h:169
static const int PageSize
Definition: StreamCompression.h:105
OPENVDB_API std::unique_ptr< char[]> bloscDecompress(const char *buffer, const size_t expectedBytes, const bool resize=true)
Decompress and return the the heap-allocated uncompressed buffer.
void setOutputStream(std::ostream &os)
Definition: StreamCompression.h:258
A Paging wrapper to std::istream that is responsible for reading from a given input stream and creati...
Definition: StreamCompression.h:206
void setSizeOnly(bool sizeOnly)
Size-only mode tags the stream as only writing size data.
Definition: StreamCompression.h:253
#define OPENVDB_VERSION_NAME
The version namespace name for this library version.
Definition: version.h.in:121
Ptr copy()
Return a copy of this PageHandle.
Definition: StreamCompression.h:191
OPENVDB_API size_t bloscUncompressedSize(const char *buffer)
Retrieves the uncompressed size of buffer when uncompressed.
bool sizeOnly() const
Definition: StreamCompression.h:217
std::shared_ptr< Page > Ptr
Definition: StreamCompression.h:126
std::istream & getInputStream()
Definition: StreamCompression.h:220
int size() const
Return the size of the buffer.
Definition: StreamCompression.h:184
#define OPENVDB_USE_VERSION_NAMESPACE
Definition: version.h.in:212