evio  6.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
RecordSupply.h
Go to the documentation of this file.
1 //
2 // Copyright 2020, Jefferson Science Associates, LLC.
3 // Subject to the terms in the LICENSE file found in the top-level directory.
4 //
5 // EPSCI Group
6 // Thomas Jefferson National Accelerator Facility
7 // 12000, Jefferson Ave, Newport News, VA 23606
8 // (757)-269-7100
9 
10 
11 #ifndef EVIO_6_0_RECORDSUPPLY_H
12 #define EVIO_6_0_RECORDSUPPLY_H
13 
14 
15 #include <string>
16 #include <memory>
17 #include <vector>
18 #include <atomic>
19 #include <mutex>
20 
21 
22 #include "ByteOrder.h"
23 #include "Compressor.h"
24 #include "RecordRingItem.h"
25 #include "EvioException.h"
26 #include "Disruptor/Util.h"
27 #include "Disruptor/Sequence.h"
28 #include "Disruptor/ISequence.h"
29 #include "Disruptor/RingBuffer.h"
30 #include "Disruptor/ISequenceBarrier.h"
31 #include "Disruptor/TimeoutException.h"
32 #include "Disruptor/SpinCountBackoffWaitStrategy.h"
33 
34 
35 namespace evio {
36 
37 
101  class RecordSupply {
102 
103  private:
104 
106  std::mutex supplyMutex;
107 
110 
113  uint32_t maxEventCount = 0;
116  uint32_t maxBufferSize = 0;
120  uint32_t compressionThreadCount = 1;
122  uint32_t ringSize = 0;
123 
124 
126  std::shared_ptr<Disruptor::RingBuffer<std::shared_ptr<RecordRingItem>>> ringBuffer = nullptr;
127 
128 
129  // Stuff for reporting errors
130 
132  std::atomic<bool> haveErrorCondition{false};
134  std::string error {""};
135 
136  // Stuff for reporting conditions (disk is full)
137 
140  std::atomic<bool> diskFull{false};
141 
142  // Stuff for compression threads
143 
146  std::shared_ptr<Disruptor::ISequenceBarrier> compressBarrier;
148  std::vector<std::shared_ptr<Disruptor::ISequence>> compressSeqs;
151  std::vector<int64_t> nextCompressSeqs;
154  std::vector<int64_t> availableCompressSeqs;
155 
156  // Stuff for writing thread
157 
160  std::shared_ptr<Disruptor::ISequenceBarrier> writeBarrier;
162  std::vector<std::shared_ptr<Disruptor::ISequence>> writeSeqs;
164  int64_t nextWriteSeq = 0L;
166  int64_t availableWriteSeq = 0L;
167 
168  // For thread safety in getToWrite() & releaseWriter()
169 
171  int64_t lastSequenceReleased = -1L;
173  int64_t maxSequence = -1L;
176  uint32_t between = 0;
177 
178 
179  public:
180 
181  RecordSupply();
182  // No need to copy these things
183  RecordSupply(const RecordSupply & supply) = delete;
184  RecordSupply(uint32_t ringSize, ByteOrder order,
185  uint32_t threadCount, uint32_t maxEventCount, uint32_t maxBufferSize,
186  Compressor::CompressionType & compressionType);
187 
189  compressSeqs.clear();
190  nextCompressSeqs.clear();
191  nextCompressSeqs.clear();
192  availableCompressSeqs.clear();
193  writeSeqs.clear();
194  ringBuffer.reset();
195  }
196 
197  void errorAlert();
198 
199  uint32_t getMaxRingBytes();
200  uint32_t getRingSize();
201  ByteOrder & getOrder();
202  uint64_t getFillLevel();
203  int64_t getLastSequence();
204 
205  std::shared_ptr<RecordRingItem> get();
206  void publish(std::shared_ptr<RecordRingItem> & item);
207  std::shared_ptr<RecordRingItem> getToCompress(uint32_t threadNumber);
208  std::shared_ptr<RecordRingItem> getToWrite();
209 
210  void releaseCompressor(std::shared_ptr<RecordRingItem> & item);
211  bool releaseWriterSequential(std::shared_ptr<RecordRingItem> & item);
212  bool releaseWriter(std::shared_ptr<RecordRingItem> & item);
213  void release(uint32_t threadNum, int64_t sequenceNum);
214 
215  bool haveError();
216  void haveError(bool err);
217  std::string getError();
218  void setError(std::string & err);
219 
220  bool isDiskFull();
221  void setDiskFull(bool full);
222 
223  };
224 
225 }
226 
227 
228 #endif //EVIO_6_0_RECORDSUPPLY_H
int64_t getLastSequence()
Get the sequence of last ring buffer item published (seq starts at 0).
Definition: RecordSupply.cpp:166
bool releaseWriter(std::shared_ptr< RecordRingItem > &item)
A writer thread releases its claim on the given ring buffer item so it becomes available for reuse by...
Definition: RecordSupply.cpp:332
bool isDiskFull()
Has the writing of a RecordRingItem to disk has been stopped due to the disk partition being full...
Definition: RecordSupply.cpp:429
CompressionType
Enum of supported data compression types.
Definition: Compressor.h:65
Numerical values associated with endian byte order.
Definition: ByteOrder.h:53
void setError(std::string &err)
Set the error message.
Definition: RecordSupply.cpp:416
uint32_t getRingSize()
Get the number of records in this supply.
Definition: RecordSupply.cpp:139
std::shared_ptr< RecordRingItem > getToCompress(uint32_t threadNumber)
Get the next available record item from the ring buffer in order to compress the data already in it...
Definition: RecordSupply.cpp:212
Definition: Compressor.h:66
bool releaseWriterSequential(std::shared_ptr< RecordRingItem > &item)
A writer thread releases its claim on the given ring buffer item so it becomes available for reuse by...
Definition: RecordSupply.cpp:298
uint64_t getFillLevel()
Get the percentage of data-filled but unwritten records in ring.
Definition: RecordSupply.cpp:157
This thread-safe, lock-free class is used to provide a very fast supply of RecordRingItems which are ...
Definition: RecordSupply.h:101
~RecordSupply()
Definition: RecordSupply.h:188
bool haveError()
Has an error occurred in writing or compressing data?
Definition: RecordSupply.cpp:390
void setDiskFull(bool full)
Set whether the writing of a RecordRingItem to disk has been stopped due to the disk partition being ...
Definition: RecordSupply.cpp:438
ByteOrder & getOrder()
Get the byte order of all records in this supply.
Definition: RecordSupply.cpp:146
void publish(std::shared_ptr< RecordRingItem > &item)
Tell consumers that the record item is ready for consumption.
Definition: RecordSupply.cpp:198
RecordSupply()
Constructor.
Definition: RecordSupply.cpp:22
std::shared_ptr< RecordRingItem > getToWrite()
Get the next available record item from the ring buffer in order to write data into it...
Definition: RecordSupply.cpp:246
std::string getError()
If there is an error, this contains the error message.
Definition: RecordSupply.cpp:404
static const ByteOrder ENDIAN_LOCAL
Local host&#39;s byte order.
Definition: ByteOrder.h:61
uint32_t getMaxRingBytes()
Get the max number of bytes the records in this supply can hold all together.
Definition: RecordSupply.cpp:132
void releaseCompressor(std::shared_ptr< RecordRingItem > &item)
A compressing thread releases its claim on the given ring buffer item so it becomes available for use...
Definition: RecordSupply.cpp:279
void release(uint32_t threadNum, int64_t sequenceNum)
Release claim on ring items up to sequenceNum for the given compressor thread.
Definition: RecordSupply.cpp:380
void errorAlert()
Method to have sequence barriers throw a Disruptor&#39;s AlertException.
Definition: RecordSupply.cpp:122