11 #ifndef EVIO_6_0_EVENTWRITER_H
12 #define EVIO_6_0_EVENTWRITER_H
30 #include <sys/statvfs.h>
32 #ifdef USE_FILESYSTEMLIB
33 #include <experimental/filesystem>
51 #include "Disruptor/Util.h"
52 #include <boost/thread.hpp>
53 #include <boost/chrono.hpp>
55 #ifdef USE_FILESYSTEMLIB
56 namespace fs = std::experimental::filesystem;
165 std::shared_ptr<RecordSupply> supply;
169 std::atomic_long lastSeqProcessed{-1};
172 std::shared_ptr<RecordRingItem> storedItem;
174 std::atomic_bool forceToDisk{
false};
176 std::atomic<std::uint64_t> forcedRecordId{0};
185 RecordWriter(
EventWriter * pwriter, std::shared_ptr<RecordSupply> & recordSupply) :
186 writer(pwriter), supply(recordSupply) {
189 RecordWriter(RecordWriter && obj) noexcept :
191 supply(std::move(obj.supply)),
192 thd(std::move(obj.thd)) {
194 lastSeqProcessed.store(obj.lastSeqProcessed);
197 RecordWriter & operator=(RecordWriter && obj) noexcept {
200 lastSeqProcessed.store(obj.lastSeqProcessed);
201 supply = std::move(obj.supply);
202 thd = std::move(obj.thd);
210 if (thd.try_join_for(boost::chrono::milliseconds(500))) {
211 std::cout <<
" RecordWriter thread did not quit after 1/2 sec" << std::endl;
217 thd = boost::thread([
this]() {this->run();});
229 void waitForLastItem() {
232 while (supply->getLastSequence() > lastSeqProcessed.load()) {
233 std::this_thread::sleep_for(std::chrono::milliseconds(1));
252 void setForcedRecordId(uint64_t
id) {
257 std::shared_ptr<RecordRingItem> storeRecordCopy(std::shared_ptr<RecordRingItem> & rec) {
259 storedItem = std::make_shared<RecordRingItem>(*(rec.get()));
272 auto item = supply->getToWrite();
276 boost::this_thread::disable_interruption d1;
278 int64_t currentSeq = item->getSequence();
280 auto record = item->getRecord();
285 bool checkDisk = item->isCheckDisk();
289 if ((writer->bytesWritten < 1) && checkDisk && (!forceToDisk.load())) {
313 while (writer->fullDisk() && (!forceToDisk.load())) {
315 std::this_thread::sleep_for(std::chrono::seconds(1));
318 if (!item->isAlreadyReleased()) {
320 auto copiedItem = storeRecordCopy(item);
322 supply->releaseWriter(item);
336 writer->writeToFileMT(item, forceToDisk.load());
340 if (forceToDisk.load() && (forcedRecordId.load() == item->getId())) {
346 lastSeqProcessed = currentSeq;
349 if (item->splitFileAfterWrite()) {
354 supply->releaseWriter(item);
358 catch (boost::thread_interrupted & e) {
359 std::cout <<
"EventWriter: INTERRUPTED, return" << std::endl;
374 class CloseAsyncFChan {
378 std::shared_ptr<std::fstream> afChannel;
379 std::shared_ptr<std::future<void>> future;
380 std::shared_ptr<RecordSupply> supply;
381 std::shared_ptr<RecordRingItem> item;
383 std::shared_ptr<std::vector<uint32_t>> recLengths;
384 uint64_t bytesWrittenToFile;
393 std::shared_ptr<CloseAsyncFChan> sharedPtrOfMe;
398 uint8_t *hdrArray = hdrBuffer.
array();
406 CloseAsyncFChan(std::shared_ptr<std::fstream> &afc,
407 std::shared_ptr<std::future<void>> &future1,
408 std::shared_ptr<RecordSupply> &supply,
409 std::shared_ptr<RecordRingItem> &ringItem,
410 FileHeader &fileHeader, std::shared_ptr<std::vector<uint32_t>> recordLengths,
411 uint64_t bytesWritten, uint32_t recordNumber,
412 bool addingTrailer,
bool writeIndex,
bool noWriting,
415 afChannel(afc), future(future1), supply(supply),
416 item(ringItem), byteOrder(order) {
418 fHeader = fileHeader;
419 recLengths = recordLengths;
420 bytesWrittenToFile = bytesWritten;
421 recordNum = recordNumber;
422 addTrailer = addingTrailer;
423 writeIndx = writeIndex;
424 noFileWriting = noWriting;
427 hdrBuffer.order(order);
438 void setSharedPointerOfThis(std::shared_ptr<CloseAsyncFChan> & mySharedPtr) {
439 sharedPtrOfMe = mySharedPtr;
449 if (thd.try_join_for(boost::chrono::milliseconds(500))) {
450 std::cout <<
"CloseAsyncFChan thread did not quit after 1/2 sec" << std::endl;
455 closer->removeThread(sharedPtrOfMe);
457 catch (std::exception & e) {}
466 thd = boost::thread([
this]() { this->run(); });
471 if (future !=
nullptr) {
476 catch (std::exception &e) {}
480 std::cout <<
"Closer: releaseWriterSequential, will release item seq = " << item->getSequence() << std::endl;
481 supply->releaseWriterSequential(item);
484 if (addTrailer && !noFileWriting) {
485 writeTrailerToFile();
488 catch (std::exception &e) {}
493 catch (std::exception &e) {
494 std::cout << e.what() << std::endl;
499 closer->removeThread(sharedPtrOfMe);
501 catch (std::exception & e) {}
515 void writeTrailerToFile() {
518 uint64_t trailerPosition = bytesWrittenToFile;
529 afChannel->write(reinterpret_cast<char *>(hdrArray),
531 if (afChannel->fail()) {
542 if (hdrBufferBytes < bytesToWrite) {
544 hdrArray = hdrBuffer.array();
546 hdrBuffer.limit(bytesToWrite).position(0);
553 afChannel->write(reinterpret_cast<char *>(hdrArray), bytesToWrite);
554 if (afChannel->fail()) {
560 if (!byteOrder.isLocalEndian()) {
561 trailerPosition =
SWAP_64(trailerPosition);
565 afChannel->write(reinterpret_cast<char *>(&trailerPosition),
sizeof(trailerPosition));
566 if (afChannel->fail()) {
572 uint32_t bitInfo = fHeader.setBitInfo(fHeader.hasFirstEvent(),
573 fHeader.hasDictionary(),
575 if (!byteOrder.isLocalEndian()) {
579 afChannel->write(reinterpret_cast<char *>(&bitInfo),
sizeof(bitInfo));
580 if (afChannel->fail()) {
586 uint32_t recordCount = recordNum - 1;
587 if (!byteOrder.isLocalEndian()) {
588 recordCount =
SWAP_32(recordCount);
591 afChannel->write(reinterpret_cast<char *>(&recordCount),
sizeof(recordCount));
592 if (afChannel->fail()) {
603 std::vector<std::shared_ptr<CloseAsyncFChan>> threads;
611 for (
const std::shared_ptr<CloseAsyncFChan> & thread : threads) {
612 thread->stopThread();
622 void removeThread(std::shared_ptr<CloseAsyncFChan> & thread) {
624 threads.erase(std::remove(threads.begin(), threads.end(), thread), threads.end());
643 void closeAsyncFile( std::shared_ptr<std::fstream> &afc,
644 std::shared_ptr<std::future<void>> &future1,
645 std::shared_ptr<RecordSupply> &supply,
646 std::shared_ptr<RecordRingItem> &ringItem,
647 FileHeader &fileHeader, std::shared_ptr<std::vector<uint32_t>> &recordLengths,
648 uint64_t bytesWritten, uint32_t recordNumber,
649 bool addingTrailer,
bool writeIndex,
bool noFileWriting,
652 auto a = std::make_shared<CloseAsyncFChan>(afc, future1, supply, ringItem,
653 fileHeader, recordLengths,
654 bytesWritten, recordNumber,
655 addingTrailer, writeIndex,
656 noFileWriting, order,
this);
658 threads.push_back(a);
659 a->setSharedPointerOfThis(a);
676 std::shared_ptr<RecordOutput> commonRecord;
679 std::shared_ptr<RecordOutput> currentRecord;
682 std::shared_ptr<RecordRingItem> currentRingItem;
685 std::shared_ptr<RecordSupply> supply;
688 uint32_t maxSupplyBytes = 0;
696 uint32_t compressionFactor;
700 std::shared_ptr<std::vector<uint32_t>> recordLengths;
705 size_t bytesWritten = 0ULL;
708 bool addingTrailer =
true;
711 bool addTrailerIndex =
false;
714 std::vector<uint8_t> headerArray;
717 std::vector<RecordCompressor> recordCompressorThreads;
721 std::vector<EventWriter::RecordWriter> recordWriterThread;
724 uint32_t recordsWritten = 0;
728 uint32_t recordNumber = 1;
734 std::string xmlDictionary;
737 std::vector<uint8_t> dictionaryByteArray;
740 std::vector<uint8_t> firstEventByteArray;
743 bool haveFirstEvent =
false;
755 bool hasAppendDictionary =
false;
763 uint32_t eventsWrittenTotal = 0;
773 uint32_t sourceId = 0;
776 uint32_t bufferSize = 0;
786 std::shared_ptr<ByteBuffer> buffer;
790 std::shared_ptr<ByteBuffer> usedBuffer;
793 std::vector<std::shared_ptr<ByteBuffer>> internalBuffers;
796 uint32_t commonRecordBytesToBuffer = 0;
800 uint32_t eventsWrittenToBuffer = 0;
808 bool diskIsFull =
false;
813 std::atomic_bool diskIsFullVolatile{
false};
815 bool fileOpen =
false;
818 uint64_t idCounter = 0ULL;
827 std::string currentFileName;
829 #ifdef USE_FILESYSTEMLIB
831 fs::path currentFilePath;
835 std::shared_ptr<std::future<void>> future1;
839 std::shared_ptr<RecordRingItem> ringItem1;
842 uint32_t futureIndex = 0;
845 std::shared_ptr<std::fstream> asyncFileChannel =
nullptr;
848 uint64_t fileWritingPosition = 0ULL;
851 uint32_t splitNumber = 0;
854 uint32_t splitCount = 0;
857 std::string baseFileName;
860 uint32_t specifierCount = 0;
863 uint32_t runNumber = 0;
869 uint64_t split = 0ULL;
875 uint32_t splitIncrement = 0;
878 uint64_t splitEventBytes = 0ULL;
881 uint32_t splitEventCount = 0;
887 uint32_t streamId = 0;
890 uint32_t streamCount = 1;
893 bool singleThreadedCompression =
false;
896 bool overWriteOK =
false;
900 uint32_t eventsWrittenToFile = 0;
903 bool hasTrailerWithIndex =
false;
906 uint32_t userHeaderLength = 0;
909 uint32_t userHeaderPadding = 0;
912 uint32_t indexLength = 0;
916 std::shared_ptr<FileCloser> fileCloser;
923 bool noFileWriting =
false;
932 bool append =
false);
935 std::string & dictionary,
937 bool append =
false);
939 EventWriter(std::string baseName,
const std::string & directory,
const std::string & runType,
940 uint32_t runNumber, uint64_t split, uint32_t maxRecordSize, uint32_t maxEventCount,
941 const ByteOrder & byteOrder,
const std::string & xmlDictionary,
bool overWriteOK,
942 bool append, std::shared_ptr<EvioBank> firstEvent, uint32_t streamId, uint32_t splitNumber,
944 uint32_t compressionThreads, uint32_t ringSize, uint32_t bufferSize);
950 explicit EventWriter(std::shared_ptr<ByteBuffer> & buf);
951 EventWriter(std::shared_ptr<ByteBuffer> & buf, std::string & xmlDictionary);
952 EventWriter(std::shared_ptr<ByteBuffer> & buf, uint32_t maxRecordSize, uint32_t maxEventCount,
953 const std::string & xmlDictionary, uint32_t recordNumber,
958 void reInitializeBuffer(std::shared_ptr<ByteBuffer> & buf,
const std::bitset<24> *bitInfo,
959 uint32_t recordNumber,
bool useCurrentBitInfo);
961 static void staticWriteFunction(
EventWriter *pWriter,
const char* data,
size_t len);
962 static void staticDoNothingFunction(
EventWriter *pWriter);
967 void setBuffer(std::shared_ptr<ByteBuffer> & buf, std::bitset<24> *bitInfo, uint32_t recNumber);
968 void setBuffer(std::shared_ptr<ByteBuffer> & buf);
972 std::shared_ptr<ByteBuffer> getBuffer();
1001 void createCommonRecord(
const std::string & xmlDict,
1002 std::shared_ptr<EvioBank>
const & firstBank,
1003 std::shared_ptr<EvioNode>
const & firstNode,
1004 std::shared_ptr<ByteBuffer>
const & firstBuf);
1006 void writeFileHeader() ;
1020 void toAppendPosition();
1026 bool writeEvent(std::shared_ptr<EvioNode> & node,
bool force);
1027 bool writeEvent(std::shared_ptr<EvioNode> & node,
bool force,
bool duplicate);
1028 bool writeEventToFile(std::shared_ptr<EvioNode> & node,
bool force,
bool duplicate);
1030 bool writeEvent(std::shared_ptr<ByteBuffer> & bankBuffer);
1031 bool writeEvent(std::shared_ptr<ByteBuffer> & bankBuffer,
bool force);
1033 bool writeEvent(std::shared_ptr<EvioBank> bank);
1034 bool writeEvent(std::shared_ptr<EvioBank> bank,
bool force);
1038 bool writeEvent(std::shared_ptr<EvioBank> bank,
1039 std::shared_ptr<ByteBuffer> bankBuffer,
bool force);
1041 std::shared_ptr<ByteBuffer> bankBuffer,
bool force);
1045 void compressAndWriteToFile(
bool force);
1046 bool tryCompressAndWriteToFile(
bool force);
1048 bool writeToFile(
bool force,
bool checkDisk);
1049 void writeToFileMT(std::shared_ptr<RecordRingItem> & item,
bool force);
1052 void writeTrailerToFile(
bool writeIndex);
1053 void flushCurrentRecordToBuffer() ;
1054 bool writeToBuffer(std::shared_ptr<EvioBank> & bank, std::shared_ptr<ByteBuffer> & bankBuffer) ;
1056 uint32_t trailerBytes();
1057 void writeTrailerToBuffer(
bool writeIndex);
1064 #endif //EVIO_6_0_EVENTWRITER_H
bool hasRoom(uint32_t bytes)
Is there room to write this many bytes to an output buffer as a single event? Will always return true...
Definition: EventWriter.cpp:1640
uint32_t getRecordNumber() const
Get the current record number.
Definition: EventWriter.cpp:820
This class is copied from one of the same name in the Java programming language.
Definition: ByteBuffer.h:42
std::shared_ptr< ByteBuffer > getByteBuffer()
If writing to a file, return null.
Definition: EventWriter.cpp:698
bool writingToFile() const
Is this object writing to file?
Definition: EventWriter.cpp:752
uint32_t getSplitCount() const
Get the number of split files produced by this writer.
Definition: EventWriter.cpp:812
EventWriter(std::string &filename, const ByteOrder &byteOrder=ByteOrder::nativeOrder(), bool append=false)
Creates an EventWriter for writing to a file in the specified byte order.
Definition: EventWriter.cpp:35
CompressionType
Enum of supported data compression types.
Definition: Compressor.h:65
Numerical values associated with endian byte order.
Definition: ByteOrder.h:53
#define SWAP_64(x)
Macro for swapping 64 bit types.
Definition: ByteOrder.h:35
size_t getBytesWrittenToBuffer() const
If writing to a buffer, get the number of bytes written to it including the trailer.
Definition: EventWriter.cpp:777
ByteOrder getByteOrder() const
Get the byte order of the buffer/file being written into.
Definition: EventWriter.cpp:843
uint32_t getSplitNumber() const
Get the current split number which is the split number of file to be written next.
Definition: EventWriter.cpp:805
void setStartingRecordNumber(uint32_t startingRecordNumber)
Set the number with which to start block (record) numbers.
Definition: EventWriter.cpp:865
Exception class for Evio software package.
Definition: EvioException.h:29
Definition: Compressor.h:66
void setSourceId(int sId)
Set the value of the source Id in the first block header.
Definition: EventWriter.cpp:725
An EventWriter object is used for writing events to a file or to a byte buffer.
Definition: EventWriter.h:148
bool isDiskFull()
If writing file, is the partition it resides on full? Not full, in this context, means there's enough...
Definition: EventWriter.cpp:610
static ByteOrder const & nativeOrder()
Get the byte order of the local host.
Definition: ByteOrder.h:133
void setFirstEvent(std::shared_ptr< EvioNode > &node)
Set an event which will be written to the file as well as to all split files.
Definition: EventWriter.cpp:910
void setBuffer(std::shared_ptr< ByteBuffer > &buf, std::bitset< 24 > *bitInfo, uint32_t recNumber)
Set the buffer being written into (initially set in constructor).
Definition: EventWriter.cpp:629
bool writeEventToFile(std::shared_ptr< EvioNode > &node, bool force, bool duplicate)
Write an event (bank) into a record and eventually to a file in evio/hipo version 6 format...
Definition: EventWriter.cpp:1801
uint32_t getEventsWritten() const
Get the number of events written to a file/buffer.
Definition: EventWriter.cpp:832
std::string getCurrentFilePath() const
Get the full name or path of the current file being written to.
Definition: EventWriter.cpp:785
bool writeEvent(std::shared_ptr< EvioNode > &node, bool force)
Write an event (bank) into a record in evio/hipo version 6 format.
Definition: EventWriter.cpp:1686
#define SWAP_32(x)
Macro for swapping 32 bit types.
Definition: ByteOrder.h:28
std::string getCurrentFilename() const
Get the name of the current file being written to.
Definition: EventWriter.cpp:769
void close()
This method flushes any remaining data to file and disables this object.
Definition: EventWriter.cpp:1231
void setEventType(int type)
Set the bit info of a record header for a specified CODA event type.
Definition: EventWriter.cpp:742
uint8_t * array() const
Get a pointer to this buffer's backing array which contains the data.
Definition: ByteBuffer.cpp:475
void flush()
This method flushes any remaining internally buffered data to file.
Definition: EventWriter.cpp:1191
bool isClosed() const
Has close() been called (without reopening by calling setBuffer()) ?
Definition: EventWriter.cpp:761
void examineFileHeader()
Reads part of the file header in order to determine the evio version # and endianness of the file in ...
Definition: EventWriter.cpp:1339
static const ByteOrder ENDIAN_LOCAL
Local host's byte order.
Definition: ByteOrder.h:61