11 #ifndef EVIO_6_0_WRITERMT_H
12 #define EVIO_6_0_WRITERMT_H
40 #include "Disruptor/Util.h"
41 #include <boost/thread.hpp>
42 #include <boost/chrono.hpp>
82 std::shared_ptr<RecordSupply> supply;
86 std::atomic_long lastSeqProcessed{-1};
95 RecordWriter(
WriterMT * pwriter, std::shared_ptr<RecordSupply> & recordSupply) :
96 writer(pwriter), supply(recordSupply) {
103 RecordWriter(RecordWriter && obj) noexcept :
105 supply(std::move(obj.supply)),
106 thd(std::move(obj.thd)) {
108 lastSeqProcessed.store(obj.lastSeqProcessed);
111 RecordWriter & operator=(RecordWriter && obj) noexcept {
114 lastSeqProcessed.store(obj.lastSeqProcessed);
115 supply = std::move(obj.supply);
116 thd = std::move(obj.thd);
124 if (thd.try_join_for(boost::chrono::milliseconds(500))) {
125 std::cout <<
"RecordWriter thread did not quit after 1/2 sec" << std::endl;
131 thd = boost::thread([
this]() {this->run();});
143 void waitForLastItem() {
146 while (supply->getLastSequence() > lastSeqProcessed.load()) {
147 std::this_thread::sleep_for(std::chrono::milliseconds(1));
161 std::cout <<
" RecordWriter: try getting record to write" << std::endl;
163 auto item = supply->getToWrite();
167 boost::this_thread::disable_interruption d1;
169 currentSeq = item->getSequence();
171 std::shared_ptr<RecordOutput> & record = item->getRecord();
174 auto & header = record->getHeader();
175 int bytesToWrite = header->getLength();
177 writer->recordLengths->push_back(bytesToWrite);
179 writer->recordLengths->push_back(header->getEntries());
180 writer->writerBytesWritten += bytesToWrite;
182 auto buf = record->getBinaryBuffer();
183 std::cout <<
" RecordWriter: use outFile to write file, buf pos = " << buf->position() <<
184 ", lim = " << buf->limit() <<
", bytesToWrite = " << bytesToWrite << std::endl;
185 writer->outFile.write(reinterpret_cast<const char *>(buf->array()), bytesToWrite);
186 if (writer->outFile.fail()) {
193 supply->releaseWriter(item);
196 lastSeqProcessed = currentSeq;
200 catch (boost::thread_interrupted & e) {
211 size_t writerBytesWritten = 0ULL;
213 uint8_t* firstEvent =
nullptr;
215 uint32_t firstEventLength = 0;
217 uint32_t maxEventCount = 0;
219 uint32_t maxBufferSize = 0;
221 uint32_t recordNumber = 1;
223 uint32_t compressionThreadCount = 1;
226 std::string fileName =
"";
229 std::ofstream outFile;
235 std::string dictionary;
238 std::shared_ptr<ByteBuffer> dictionaryFirstEventBuffer;
244 std::shared_ptr<RecordOutput> outputRecord;
247 std::vector<uint8_t> headerArray;
254 std::shared_ptr<std::vector<uint32_t>> recordLengths;
257 std::shared_ptr<RecordSupply> supply;
261 std::vector<RecordWriter> recordWriterThreads;
264 std::vector<RecordCompressor> recordCompressorThreads;
267 std::shared_ptr<RecordRingItem> ringItem;
271 bool addingTrailer =
true;
273 bool addTrailerIndex =
false;
279 bool firstRecordWritten =
false;
281 bool haveDictionary =
false;
283 bool haveFirstEvent =
false;
285 bool haveUserHeader =
false;
292 WriterMT(
const ByteOrder & order, uint32_t maxEventCount, uint32_t maxBufferSize,
298 uint32_t maxEventCount = 0,
299 uint32_t maxBufferSize = 0,
300 const std::string & dictionary =
"",
301 uint8_t* firstEvent =
nullptr,
302 uint32_t firstEventLen = 0,
304 uint32_t compressionThreads = 1,
305 bool addTrailerIndex =
false,
306 uint32_t ringSize = 16);
308 explicit WriterMT(
const std::string & filename);
310 WriterMT(
const std::string & filename,
const ByteOrder & order, uint32_t maxEventCount, uint32_t maxBufferSize,
319 std::shared_ptr<ByteBuffer> createDictionaryRecord();
320 void writeTrailer(
bool writeIndex, uint32_t recordNum);
336 void open(
const std::string & filename);
337 void open(
const std::string & filename, uint8_t* userHdr, uint32_t userLen);
339 std::shared_ptr<ByteBuffer>
createHeader(uint8_t* userHdr, uint32_t userLen);
346 void addEvent(uint8_t* buffer, uint32_t offset, uint32_t length);
359 #endif //EVIO_6_0_WRITERMT_H
const ByteOrder & getByteOrder() const
Get the file's byte order.
Definition: WriterMT.cpp:178
This class is copied from one of the same name in the Java programming language.
Definition: ByteBuffer.h:42
static const ByteOrder ENDIAN_LITTLE
Little endian byte order.
Definition: ByteOrder.h:57
WriterMT()
Default constructor.
Definition: WriterMT.cpp:22
CompressionType
Enum of supported data compression types.
Definition: Compressor.h:65
Numerical values associated with endian byte order.
Definition: ByteOrder.h:53
This class is for writing Evio/HIPO files only (not buffers).
Definition: WriterMT.h:65
void addEvent(uint8_t *buffer, uint32_t offset, uint32_t length)
Add a byte array to the current internal record.
Definition: WriterMT.cpp:566
Exception class for Evio software package.
Definition: EvioException.h:29
Definition: Compressor.h:66
FileHeader & getFileHeader()
Get the file header.
Definition: WriterMT.cpp:185
Class which handles the creation and use of Evio & HIPO Records.
Definition: RecordOutput.h:105
This class is used to store relevant info about an evio container (bank, segment, or tag segment)...
Definition: EvioNode.h:41
void close()
Close opened file.
Definition: WriterMT.cpp:685
void writeRecord(RecordOutput &record)
Appends the record to the file.
Definition: WriterMT.cpp:524
Compressor::CompressionType getCompressionType()
Get the internal record's header.
Definition: WriterMT.cpp:205
bool addTrailer() const
Does this writer add a trailer to the end of the file/buffer?
Definition: WriterMT.cpp:212
void reset()
Get this object ready for re-use.
Definition: WriterMT.cpp:666
std::shared_ptr< ByteBuffer > createHeader(uint8_t *userHdr, uint32_t userLen)
Create and return a buffer containing a general file header followed by the user header given in the ...
Definition: WriterMT.cpp:360
bool addTrailerWithIndex()
Does this writer add a trailer with a record index to the end of the file? Or, if writing to a buffer...
Definition: WriterMT.cpp:231
static const ByteOrder ENDIAN_LOCAL
Local host's byte order.
Definition: ByteOrder.h:61
void open(const std::string &filename)
Open a new file and write file header with no user header.
Definition: WriterMT.cpp:257