11 #ifndef EVIO_6_0_RECORDSUPPLY_H
12 #define EVIO_6_0_RECORDSUPPLY_H
26 #include "Disruptor/Util.h"
27 #include "Disruptor/Sequence.h"
28 #include "Disruptor/ISequence.h"
29 #include "Disruptor/RingBuffer.h"
30 #include "Disruptor/ISequenceBarrier.h"
31 #include "Disruptor/TimeoutException.h"
32 #include "Disruptor/SpinCountBackoffWaitStrategy.h"
106 std::mutex supplyMutex;
113 uint32_t maxEventCount = 0;
116 uint32_t maxBufferSize = 0;
120 uint32_t compressionThreadCount = 1;
122 uint32_t ringSize = 0;
126 std::shared_ptr<Disruptor::RingBuffer<std::shared_ptr<RecordRingItem>>> ringBuffer =
nullptr;
132 std::atomic<bool> haveErrorCondition{
false};
134 std::string error {
""};
140 std::atomic<bool> diskFull{
false};
146 std::shared_ptr<Disruptor::ISequenceBarrier> compressBarrier;
148 std::vector<std::shared_ptr<Disruptor::ISequence>> compressSeqs;
151 std::vector<int64_t> nextCompressSeqs;
154 std::vector<int64_t> availableCompressSeqs;
160 std::shared_ptr<Disruptor::ISequenceBarrier> writeBarrier;
162 std::vector<std::shared_ptr<Disruptor::ISequence>> writeSeqs;
164 int64_t nextWriteSeq = 0L;
166 int64_t availableWriteSeq = 0L;
171 int64_t lastSequenceReleased = -1L;
173 int64_t maxSequence = -1L;
176 uint32_t between = 0;
185 uint32_t threadCount, uint32_t maxEventCount, uint32_t maxBufferSize,
189 compressSeqs.clear();
190 nextCompressSeqs.clear();
191 nextCompressSeqs.clear();
192 availableCompressSeqs.clear();
205 std::shared_ptr<RecordRingItem>
get();
206 void publish(std::shared_ptr<RecordRingItem> & item);
207 std::shared_ptr<RecordRingItem>
getToCompress(uint32_t threadNumber);
213 void release(uint32_t threadNum, int64_t sequenceNum);
228 #endif //EVIO_6_0_RECORDSUPPLY_H
int64_t getLastSequence()
Get the sequence of last ring buffer item published (seq starts at 0).
Definition: RecordSupply.cpp:166
bool releaseWriter(std::shared_ptr< RecordRingItem > &item)
A writer thread releases its claim on the given ring buffer item so it becomes available for reuse by...
Definition: RecordSupply.cpp:332
bool isDiskFull()
Has the writing of a RecordRingItem to disk has been stopped due to the disk partition being full...
Definition: RecordSupply.cpp:429
CompressionType
Enum of supported data compression types.
Definition: Compressor.h:65
Numerical values associated with endian byte order.
Definition: ByteOrder.h:53
void setError(std::string &err)
Set the error message.
Definition: RecordSupply.cpp:416
uint32_t getRingSize()
Get the number of records in this supply.
Definition: RecordSupply.cpp:139
std::shared_ptr< RecordRingItem > getToCompress(uint32_t threadNumber)
Get the next available record item from the ring buffer in order to compress the data already in it...
Definition: RecordSupply.cpp:212
Definition: Compressor.h:66
bool releaseWriterSequential(std::shared_ptr< RecordRingItem > &item)
A writer thread releases its claim on the given ring buffer item so it becomes available for reuse by...
Definition: RecordSupply.cpp:298
uint64_t getFillLevel()
Get the percentage of data-filled but unwritten records in ring.
Definition: RecordSupply.cpp:157
This thread-safe, lock-free class is used to provide a very fast supply of RecordRingItems which are ...
Definition: RecordSupply.h:101
~RecordSupply()
Definition: RecordSupply.h:188
bool haveError()
Has an error occurred in writing or compressing data?
Definition: RecordSupply.cpp:390
void setDiskFull(bool full)
Set whether the writing of a RecordRingItem to disk has been stopped due to the disk partition being ...
Definition: RecordSupply.cpp:438
ByteOrder & getOrder()
Get the byte order of all records in this supply.
Definition: RecordSupply.cpp:146
void publish(std::shared_ptr< RecordRingItem > &item)
Tell consumers that the record item is ready for consumption.
Definition: RecordSupply.cpp:198
RecordSupply()
Constructor.
Definition: RecordSupply.cpp:22
std::shared_ptr< RecordRingItem > getToWrite()
Get the next available record item from the ring buffer in order to write data into it...
Definition: RecordSupply.cpp:246
std::string getError()
If there is an error, this contains the error message.
Definition: RecordSupply.cpp:404
static const ByteOrder ENDIAN_LOCAL
Local host's byte order.
Definition: ByteOrder.h:61
uint32_t getMaxRingBytes()
Get the max number of bytes the records in this supply can hold all together.
Definition: RecordSupply.cpp:132
void releaseCompressor(std::shared_ptr< RecordRingItem > &item)
A compressing thread releases its claim on the given ring buffer item so it becomes available for use...
Definition: RecordSupply.cpp:279
void release(uint32_t threadNum, int64_t sequenceNum)
Release claim on ring items up to sequenceNum for the given compressor thread.
Definition: RecordSupply.cpp:380
void errorAlert()
Method to have sequence barriers throw a Disruptor's AlertException.
Definition: RecordSupply.cpp:122