evio  6.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
evio::RecordSupply Class Reference

This thread-safe, lock-free class is used to provide a very fast supply of RecordRingItems which are reused (using Disruptor software package). More...

#include <RecordSupply.h>

Public Member Functions

 RecordSupply ()
 Constructor. More...
 
 RecordSupply (const RecordSupply &supply)=delete
 
 RecordSupply (uint32_t ringSize, ByteOrder order, uint32_t threadCount, uint32_t maxEventCount, uint32_t maxBufferSize, Compressor::CompressionType &compressionType)
 Constructor. More...
 
 ~RecordSupply ()
 
void errorAlert ()
 Method to have sequence barriers throw a Disruptor's AlertException. More...
 
uint32_t getMaxRingBytes ()
 Get the max number of bytes the records in this supply can hold all together. More...
 
uint32_t getRingSize ()
 Get the number of records in this supply. More...
 
ByteOrdergetOrder ()
 Get the byte order of all records in this supply. More...
 
uint64_t getFillLevel ()
 Get the percentage of data-filled but unwritten records in ring. More...
 
int64_t getLastSequence ()
 Get the sequence of last ring buffer item published (seq starts at 0). More...
 
std::shared_ptr< RecordRingItemget ()
 Get the next available record item from the ring buffer. More...
 
void publish (std::shared_ptr< RecordRingItem > &item)
 Tell consumers that the record item is ready for consumption. More...
 
std::shared_ptr< RecordRingItemgetToCompress (uint32_t threadNumber)
 Get the next available record item from the ring buffer in order to compress the data already in it. More...
 
std::shared_ptr< RecordRingItemgetToWrite ()
 Get the next available record item from the ring buffer in order to write data into it. More...
 
void releaseCompressor (std::shared_ptr< RecordRingItem > &item)
 A compressing thread releases its claim on the given ring buffer item so it becomes available for use by writing thread behind the write barrier. More...
 
bool releaseWriterSequential (std::shared_ptr< RecordRingItem > &item)
 A writer thread releases its claim on the given ring buffer item so it becomes available for reuse by the producer. More...
 
bool releaseWriter (std::shared_ptr< RecordRingItem > &item)
 A writer thread releases its claim on the given ring buffer item so it becomes available for reuse by the producer. More...
 
void release (uint32_t threadNum, int64_t sequenceNum)
 Release claim on ring items up to sequenceNum for the given compressor thread. More...
 
bool haveError ()
 Has an error occurred in writing or compressing data? More...
 
void haveError (bool err)
 Set whether an error occurred in writing or compressing data. More...
 
std::string getError ()
 If there is an error, this contains the error message. More...
 
void setError (std::string &err)
 Set the error message. More...
 
bool isDiskFull ()
 Has the writing of a RecordRingItem to disk has been stopped due to the disk partition being full? More...
 
void setDiskFull (bool full)
 Set whether the writing of a RecordRingItem to disk has been stopped due to the disk partition being full. More...
 

Detailed Description

This thread-safe, lock-free class is used to provide a very fast supply of RecordRingItems which are reused (using Disruptor software package).

It is a supply of RecordRingItems in which a single producer does a get(), fills the record with data, and finally does a publish(std::shared_ptr<RecordRingItem> &) to let consumers know the data is ready.

This class is setup to handle 2 types of consumers. The first type is a thread which compresses a record's data. The number of such consumers is set in the constructor. Each of these will call getToCompress(uint32_t) to get a record and eventually call releaseCompressor(std::shared_ptr<RecordRingItem> &) to indicate it is finished compressing and the record is available for writing to disk.

The second type of consumer is a single thread which writes all compressed records to a file. This will call getToWrite() to get a record and eventually call releaseWriter(std::shared_ptr<RecordRingItem> &) to indicate it is finished writing and the record is available for being filled with new data.

Due to the multithreaded nature of writing files using this class, a mechanism for reporting errors that occur in the writing and compressing threads is provided. Also, and probably more importantly, one can call errorAlert() to notify any compression or write threads that an error has occurred. That way these threads can clean up and exit.

It transparently makes sure that all records are written in the proper order.

  This is a graphical representation of how our ring buffer is set up.
  (1) The producer who calls get() will get a ring item allowing a record to be
      filled. That same user does a publish() when done with the record.
  (2) The consumer who calls getToCompress() will get that ring item and will
      compress its data. There may be any number of compression threads
      as long as # threads <= # of ring items!!!.
      That same user does a releaseCompressor() when done with the record.
  (3) The consumer who calls getToWrite() will get that ring item and will
      write its data to a file or another buffer. There may be only 1
      such thread. This same user does a releaseWriter() when done with the record.
                        ||
                        ||  writeBarrier
          >             ||
        /             ________
   Write thread     /    |    \
             --->  / 1 _ | _ 2 \  <---- Compression Threads 1-M
 ================ | __ /   \ __ |               |
                  |  6 |    | 3 |               V
            ^     | __ | __ | __| ==========================
            |      \   5 |   4 /       compressBarrier
        Producer->  \ __ | __ /
Version
6.0
Since
6.0 11/5/19
Author
timmer

Constructor & Destructor Documentation

evio::RecordSupply::RecordSupply ( )

Constructor.

Ring size of 4 records, compression thread count of 1, no compression, little endian data.

evio::RecordSupply::RecordSupply ( const RecordSupply supply)
delete
evio::RecordSupply::RecordSupply ( uint32_t  ringSize,
ByteOrder  order,
uint32_t  threadCount,
uint32_t  maxEventCount,
uint32_t  maxBufferSize,
Compressor::CompressionType compressionType 
)

Constructor.

Parameters
ringSizenumber of RecordRingItem objects in ring buffer.
orderbyte order of RecordOutputStream in each RecordRingItem object.
threadCountnumber of threads simultaneously doing compression. Must be <= ringSize.
maxEventCountmax number of events each record can hold. Value <= O means use default (1M).
maxBufferSizemax number of uncompressed data bytes each record can hold. Value of < 8MB results in default of 8MB.
compressionTypetype of data compression to do.
Exceptions
EvioExceptionif args < 1, ringSize not power of 2, threadCount > ringSize.

References evio::RecordRingItem::eventFactory(), and evio::RecordRingItem::setEventFactorySettings().

evio::RecordSupply::~RecordSupply ( )
inline

Member Function Documentation

void evio::RecordSupply::errorAlert ( )

Method to have sequence barriers throw a Disruptor's AlertException.

In this case, we can use it to warn write and compress threads which are waiting on barrier.waitFor() in getToCompress(uint32_t) and getToWrite(). Do this in case of a write, compress, or some other error. This allows any threads waiting on these 2 methods to wake up, clean up, and exit.

std::shared_ptr< RecordRingItem > evio::RecordSupply::get ( )

Get the next available record item from the ring buffer.

Use it to write data into the record.

Returns
next available record item in ring buffer in order to write data into it.
std::string evio::RecordSupply::getError ( )

If there is an error, this contains the error message.

Returns
error message if there is an error.
uint64_t evio::RecordSupply::getFillLevel ( )

Get the percentage of data-filled but unwritten records in ring.

Value of 0 means everything's been written. Value of 100 means that all records in the ring are filled with data (perhaps in various stages of being compressed) and have not been written yet.

Returns
percentage of used records in ring.
int64_t evio::RecordSupply::getLastSequence ( )

Get the sequence of last ring buffer item published (seq starts at 0).

Returns
sequence of last ring buffer item published (seq starts at 0).
uint32_t evio::RecordSupply::getMaxRingBytes ( )

Get the max number of bytes the records in this supply can hold all together.

Returns
max number of bytes the records in this supply can hold all together.
ByteOrder & evio::RecordSupply::getOrder ( )

Get the byte order of all records in this supply.

Returns
byte order of all records in this supply.
uint32_t evio::RecordSupply::getRingSize ( )

Get the number of records in this supply.

Returns
number of records in this supply.
std::shared_ptr< RecordRingItem > evio::RecordSupply::getToCompress ( uint32_t  threadNumber)

Get the next available record item from the ring buffer in order to compress the data already in it.

Parameters
threadNumbernumber of thread (0,1, ...) used to compress. This number cannot exceed (compressionThreadCount - 1).
Returns
next available record item in ring buffer in order to compress data already in it.
Exceptions
Disruptor::AlertExceptionif errorAlert() called.
std::shared_ptr< RecordRingItem > evio::RecordSupply::getToWrite ( )

Get the next available record item from the ring buffer in order to write data into it.

Returns
next available record item in ring buffer in order to write data into it.
Exceptions
Disruptor::AlertExceptionif errorAlert() called.
bool evio::RecordSupply::haveError ( )

Has an error occurred in writing or compressing data?

Returns
true
if an error occurred in writing or compressing data, else
false
.
void evio::RecordSupply::haveError ( bool  err)

Set whether an error occurred in writing or compressing data.

Parameters
errif
true
an error occurred in writing or compressing data, else
false
.
bool evio::RecordSupply::isDiskFull ( )

Has the writing of a RecordRingItem to disk has been stopped due to the disk partition being full?

Returns
true if he writing of a RecordRingItem to disk has been stopped due to the disk partition being full.
void evio::RecordSupply::publish ( std::shared_ptr< RecordRingItem > &  item)

Tell consumers that the record item is ready for consumption.

To be used in conjunction with get().

Parameters
itemrecord item available for consumers' use.
void evio::RecordSupply::release ( uint32_t  threadNum,
int64_t  sequenceNum 
)

Release claim on ring items up to sequenceNum for the given compressor thread.

For internal use only - to free up records that the compressor thread will skip over anyway.

Parameters
threadNumcompressor thread number.
sequenceNumsequence to release.
void evio::RecordSupply::releaseCompressor ( std::shared_ptr< RecordRingItem > &  item)

A compressing thread releases its claim on the given ring buffer item so it becomes available for use by writing thread behind the write barrier.

Because a compressing thread gets only every Nth record where N = compressionThreadCount, once it releases this record it also needs to release all events coming after, up until the one it will take next. In other words, also release the records it will skip over next. This allows close() to be called at any time without things hanging up.

To be used in conjunction with getToCompress(uint32_t).

Parameters
itemitem in ring buffer to release for reuse.
bool evio::RecordSupply::releaseWriter ( std::shared_ptr< RecordRingItem > &  item)

A writer thread releases its claim on the given ring buffer item so it becomes available for reuse by the producer.

To be used in conjunction with getToWrite().

Care must be taken to ensure thread-safety. The following can happen if no precautions are taken. In the case of EventWriterUnsync, writing to a file involves 2, simultaneous, asynchronous writes to a file - both in separate threads to the thread which calls the "write" method. If the writing of the later item finishes first, it releases it's item and sequence which, unfortunately, also releases the previous item's sequence (which is still being written). When the first write is complete, it also releases its item. However item.getSequenceObj() will return null (causing NullPointerException) because it was already released thereby allowing it to be reused and reset called on it.

In order to prevent such a scenario, releaseWriter ensures that items are only released in sequence.

If the same item is released more than once, bad things will happen. Thus the caller must take steps to prevent it. To avoid problems, call RecordRingItem#setAlreadyReleased(bool) and set to true if item is released but will still be used in some manner.

Parameters
itemitem in ring buffer to release for reuse.
Returns
false if item or released since item is null, else true.
bool evio::RecordSupply::releaseWriterSequential ( std::shared_ptr< RecordRingItem > &  item)

A writer thread releases its claim on the given ring buffer item so it becomes available for reuse by the producer.

To be used in conjunction with getToWrite().

Care must be taken to ensure thread-safety. This method may only be called if the writing is done IN THE SAME THREAD as the calling of this method so that items are released in sequence as ensured by the caller. Otherwise use releaseWriter(std::shared_ptr<RecordRingItem> &).

Parameters
itemitem in ring buffer to release for reuse.
Returns
false if item not released or item is null, else true.
void evio::RecordSupply::setDiskFull ( bool  full)

Set whether the writing of a RecordRingItem to disk has been stopped due to the disk partition being full.

Parameters
fulltrue if he writing of a RecordRingItem to disk has been stopped due to the disk partition being full.
void evio::RecordSupply::setError ( std::string &  err)

Set the error message.

Parameters
errerror message.

The documentation for this class was generated from the following files: