evio
6.0
|
This thread-safe, lock-free class is used to provide a very fast supply of RecordRingItems which are reused (using Disruptor software package). More...
#include <RecordSupply.h>
Public Member Functions | |
RecordSupply () | |
Constructor. More... | |
RecordSupply (const RecordSupply &supply)=delete | |
RecordSupply (uint32_t ringSize, ByteOrder order, uint32_t threadCount, uint32_t maxEventCount, uint32_t maxBufferSize, Compressor::CompressionType &compressionType) | |
Constructor. More... | |
~RecordSupply () | |
void | errorAlert () |
Method to have sequence barriers throw a Disruptor's AlertException. More... | |
uint32_t | getMaxRingBytes () |
Get the max number of bytes the records in this supply can hold all together. More... | |
uint32_t | getRingSize () |
Get the number of records in this supply. More... | |
ByteOrder & | getOrder () |
Get the byte order of all records in this supply. More... | |
uint64_t | getFillLevel () |
Get the percentage of data-filled but unwritten records in ring. More... | |
int64_t | getLastSequence () |
Get the sequence of last ring buffer item published (seq starts at 0). More... | |
std::shared_ptr< RecordRingItem > | get () |
Get the next available record item from the ring buffer. More... | |
void | publish (std::shared_ptr< RecordRingItem > &item) |
Tell consumers that the record item is ready for consumption. More... | |
std::shared_ptr< RecordRingItem > | getToCompress (uint32_t threadNumber) |
Get the next available record item from the ring buffer in order to compress the data already in it. More... | |
std::shared_ptr< RecordRingItem > | getToWrite () |
Get the next available record item from the ring buffer in order to write data into it. More... | |
void | releaseCompressor (std::shared_ptr< RecordRingItem > &item) |
A compressing thread releases its claim on the given ring buffer item so it becomes available for use by writing thread behind the write barrier. More... | |
bool | releaseWriterSequential (std::shared_ptr< RecordRingItem > &item) |
A writer thread releases its claim on the given ring buffer item so it becomes available for reuse by the producer. More... | |
bool | releaseWriter (std::shared_ptr< RecordRingItem > &item) |
A writer thread releases its claim on the given ring buffer item so it becomes available for reuse by the producer. More... | |
void | release (uint32_t threadNum, int64_t sequenceNum) |
Release claim on ring items up to sequenceNum for the given compressor thread. More... | |
bool | haveError () |
Has an error occurred in writing or compressing data? More... | |
void | haveError (bool err) |
Set whether an error occurred in writing or compressing data. More... | |
std::string | getError () |
If there is an error, this contains the error message. More... | |
void | setError (std::string &err) |
Set the error message. More... | |
bool | isDiskFull () |
Has the writing of a RecordRingItem to disk has been stopped due to the disk partition being full? More... | |
void | setDiskFull (bool full) |
Set whether the writing of a RecordRingItem to disk has been stopped due to the disk partition being full. More... | |
This thread-safe, lock-free class is used to provide a very fast supply of RecordRingItems which are reused (using Disruptor software package).
It is a supply of RecordRingItems in which a single producer does a get(), fills the record with data, and finally does a publish(std::shared_ptr<RecordRingItem> &) to let consumers know the data is ready.
This class is setup to handle 2 types of consumers. The first type is a thread which compresses a record's data. The number of such consumers is set in the constructor. Each of these will call getToCompress(uint32_t) to get a record and eventually call releaseCompressor(std::shared_ptr<RecordRingItem> &) to indicate it is finished compressing and the record is available for writing to disk.
The second type of consumer is a single thread which writes all compressed records to a file. This will call getToWrite() to get a record and eventually call releaseWriter(std::shared_ptr<RecordRingItem> &) to indicate it is finished writing and the record is available for being filled with new data.
Due to the multithreaded nature of writing files using this class, a mechanism for reporting errors that occur in the writing and compressing threads is provided. Also, and probably more importantly, one can call errorAlert() to notify any compression or write threads that an error has occurred. That way these threads can clean up and exit.
It transparently makes sure that all records are written in the proper order.
This is a graphical representation of how our ring buffer is set up.
(1) The producer who calls get() will get a ring item allowing a record to be
filled. That same user does a publish() when done with the record.
(2) The consumer who calls getToCompress() will get that ring item and will
compress its data. There may be any number of compression threads
as long as # threads <= # of ring items!!!.
That same user does a releaseCompressor() when done with the record.
(3) The consumer who calls getToWrite() will get that ring item and will
write its data to a file or another buffer. There may be only 1
such thread. This same user does a releaseWriter() when done with the record.
||
|| writeBarrier
> ||
/ ________
Write thread / | \
---> / 1 _ | _ 2 \ <---- Compression Threads 1-M
================ | __ / \ __ | |
| 6 | | 3 | V
^ | __ | __ | __| ==========================
| \ 5 | 4 / compressBarrier
Producer-> \ __ | __ /
evio::RecordSupply::RecordSupply | ( | ) |
Constructor.
Ring size of 4 records, compression thread count of 1, no compression, little endian data.
|
delete |
evio::RecordSupply::RecordSupply | ( | uint32_t | ringSize, |
ByteOrder | order, | ||
uint32_t | threadCount, | ||
uint32_t | maxEventCount, | ||
uint32_t | maxBufferSize, | ||
Compressor::CompressionType & | compressionType | ||
) |
Constructor.
ringSize | number of RecordRingItem objects in ring buffer. |
order | byte order of RecordOutputStream in each RecordRingItem object. |
threadCount | number of threads simultaneously doing compression. Must be <= ringSize. |
maxEventCount | max number of events each record can hold. Value <= O means use default (1M). |
maxBufferSize | max number of uncompressed data bytes each record can hold. Value of < 8MB results in default of 8MB. |
compressionType | type of data compression to do. |
EvioException | if args < 1, ringSize not power of 2, threadCount > ringSize. |
References evio::RecordRingItem::eventFactory(), and evio::RecordRingItem::setEventFactorySettings().
|
inline |
void evio::RecordSupply::errorAlert | ( | ) |
Method to have sequence barriers throw a Disruptor's AlertException.
In this case, we can use it to warn write and compress threads which are waiting on barrier.waitFor() in getToCompress(uint32_t) and getToWrite(). Do this in case of a write, compress, or some other error. This allows any threads waiting on these 2 methods to wake up, clean up, and exit.
std::shared_ptr< RecordRingItem > evio::RecordSupply::get | ( | ) |
Get the next available record item from the ring buffer.
Use it to write data into the record.
std::string evio::RecordSupply::getError | ( | ) |
If there is an error, this contains the error message.
uint64_t evio::RecordSupply::getFillLevel | ( | ) |
Get the percentage of data-filled but unwritten records in ring.
Value of 0 means everything's been written. Value of 100 means that all records in the ring are filled with data (perhaps in various stages of being compressed) and have not been written yet.
int64_t evio::RecordSupply::getLastSequence | ( | ) |
Get the sequence of last ring buffer item published (seq starts at 0).
uint32_t evio::RecordSupply::getMaxRingBytes | ( | ) |
Get the max number of bytes the records in this supply can hold all together.
ByteOrder & evio::RecordSupply::getOrder | ( | ) |
Get the byte order of all records in this supply.
uint32_t evio::RecordSupply::getRingSize | ( | ) |
Get the number of records in this supply.
std::shared_ptr< RecordRingItem > evio::RecordSupply::getToCompress | ( | uint32_t | threadNumber | ) |
Get the next available record item from the ring buffer in order to compress the data already in it.
threadNumber | number of thread (0,1, ...) used to compress. This number cannot exceed (compressionThreadCount - 1). |
Disruptor::AlertException | if errorAlert() called. |
std::shared_ptr< RecordRingItem > evio::RecordSupply::getToWrite | ( | ) |
Get the next available record item from the ring buffer in order to write data into it.
Disruptor::AlertException | if errorAlert() called. |
bool evio::RecordSupply::haveError | ( | ) |
Has an error occurred in writing or compressing data?
void evio::RecordSupply::haveError | ( | bool | err | ) |
Set whether an error occurred in writing or compressing data.
err | if true
false
|
bool evio::RecordSupply::isDiskFull | ( | ) |
Has the writing of a RecordRingItem to disk has been stopped due to the disk partition being full?
void evio::RecordSupply::publish | ( | std::shared_ptr< RecordRingItem > & | item | ) |
Tell consumers that the record item is ready for consumption.
To be used in conjunction with get().
item | record item available for consumers' use. |
void evio::RecordSupply::release | ( | uint32_t | threadNum, |
int64_t | sequenceNum | ||
) |
Release claim on ring items up to sequenceNum for the given compressor thread.
For internal use only - to free up records that the compressor thread will skip over anyway.
threadNum | compressor thread number. |
sequenceNum | sequence to release. |
void evio::RecordSupply::releaseCompressor | ( | std::shared_ptr< RecordRingItem > & | item | ) |
A compressing thread releases its claim on the given ring buffer item so it becomes available for use by writing thread behind the write barrier.
Because a compressing thread gets only every Nth record where N = compressionThreadCount, once it releases this record it also needs to release all events coming after, up until the one it will take next. In other words, also release the records it will skip over next. This allows close() to be called at any time without things hanging up.
To be used in conjunction with getToCompress(uint32_t).
item | item in ring buffer to release for reuse. |
bool evio::RecordSupply::releaseWriter | ( | std::shared_ptr< RecordRingItem > & | item | ) |
A writer thread releases its claim on the given ring buffer item so it becomes available for reuse by the producer.
To be used in conjunction with getToWrite().
Care must be taken to ensure thread-safety. The following can happen if no precautions are taken. In the case of EventWriterUnsync, writing to a file involves 2, simultaneous, asynchronous writes to a file - both in separate threads to the thread which calls the "write" method. If the writing of the later item finishes first, it releases it's item and sequence which, unfortunately, also releases the previous item's sequence (which is still being written). When the first write is complete, it also releases its item. However item.getSequenceObj() will return null (causing NullPointerException) because it was already released thereby allowing it to be reused and reset called on it.
In order to prevent such a scenario, releaseWriter ensures that items are only released in sequence.
If the same item is released more than once, bad things will happen. Thus the caller must take steps to prevent it. To avoid problems, call RecordRingItem#setAlreadyReleased(bool) and set to true if item is released but will still be used in some manner.
item | item in ring buffer to release for reuse. |
bool evio::RecordSupply::releaseWriterSequential | ( | std::shared_ptr< RecordRingItem > & | item | ) |
A writer thread releases its claim on the given ring buffer item so it becomes available for reuse by the producer.
To be used in conjunction with getToWrite().
Care must be taken to ensure thread-safety. This method may only be called if the writing is done IN THE SAME THREAD as the calling of this method so that items are released in sequence as ensured by the caller. Otherwise use releaseWriter(std::shared_ptr<RecordRingItem> &).
item | item in ring buffer to release for reuse. |
void evio::RecordSupply::setDiskFull | ( | bool | full | ) |
Set whether the writing of a RecordRingItem to disk has been stopped due to the disk partition being full.
full | true if he writing of a RecordRingItem to disk has been stopped due to the disk partition being full. |
void evio::RecordSupply::setError | ( | std::string & | err | ) |
Set the error message.
err | error message. |