evio  6.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
EventWriter.h
Go to the documentation of this file.
1 //
2 // Copyright 2020, Jefferson Science Associates, LLC.
3 // Subject to the terms in the LICENSE file found in the top-level directory.
4 //
5 // EPSCI Group
6 // Thomas Jefferson National Accelerator Facility
7 // 12000, Jefferson Ave, Newport News, VA 23606
8 // (757)-269-7100
9 
10 
11 #ifndef EVIO_6_0_EVENTWRITER_H
12 #define EVIO_6_0_EVENTWRITER_H
13 
14 
15 #include <cstdlib>
16 #include <iostream>
17 #include <iomanip>
18 #include <fstream>
19 #include <vector>
20 #include <string>
21 #include <queue>
22 #include <chrono>
23 #include <memory>
24 #include <bitset>
25 #include <exception>
26 #include <atomic>
27 #include <algorithm>
28 #include <future>
29 #include <sys/stat.h>
30 #include <sys/statvfs.h>
31 
32 #ifdef USE_FILESYSTEMLIB
33  #include <experimental/filesystem>
34 #endif
35 
36 
37 #include "HeaderType.h"
38 #include "FileHeader.h"
39 #include "ByteBuffer.h"
40 #include "ByteOrder.h"
41 #include "RecordOutput.h"
42 #include "RecordHeader.h"
43 #include "Compressor.h"
44 #include "RecordSupply.h"
45 #include "RecordCompressor.h"
46 #include "Util.h"
47 #include "EvioException.h"
48 #include "EvioBank.h"
49 
50 
51 #include "Disruptor/Util.h"
52 #include <boost/thread.hpp>
53 #include <boost/chrono.hpp>
54 
55 #ifdef USE_FILESYSTEMLIB
56 namespace fs = std::experimental::filesystem;
57 #endif
58 
59 
60 namespace evio {
61 
148  class EventWriter {
149 
150  private:
151 
152 
158  class RecordWriter {
159 
160  private:
161 
163  EventWriter * writer = nullptr;
165  std::shared_ptr<RecordSupply> supply;
167  boost::thread thd;
169  std::atomic_long lastSeqProcessed{-1};
170 
172  std::shared_ptr<RecordRingItem> storedItem;
174  std::atomic_bool forceToDisk{false};
176  std::atomic<std::uint64_t> forcedRecordId{0};
177 
178  public:
179 
185  RecordWriter(EventWriter * pwriter, std::shared_ptr<RecordSupply> & recordSupply) :
186  writer(pwriter), supply(recordSupply) {
187  }
188 
189  RecordWriter(RecordWriter && obj) noexcept :
190  writer(obj.writer),
191  supply(std::move(obj.supply)),
192  thd(std::move(obj.thd)) {
193 
194  lastSeqProcessed.store(obj.lastSeqProcessed);
195  }
196 
197  RecordWriter & operator=(RecordWriter && obj) noexcept {
198  if (this != &obj) {
199  writer = obj.writer;
200  lastSeqProcessed.store(obj.lastSeqProcessed);
201  supply = std::move(obj.supply);
202  thd = std::move(obj.thd);
203  }
204  return *this;
205  }
206 
207  // Do not free writer!
208  ~RecordWriter() {
209  thd.interrupt();
210  if (thd.try_join_for(boost::chrono::milliseconds(500))) {
211  std::cout << " RecordWriter thread did not quit after 1/2 sec" << std::endl;
212  }
213  }
214 
216  void startThread() {
217  thd = boost::thread([this]() {this->run();});
218  }
219 
221  void stopThread() {
222  // Send signal to interrupt it
223  thd.interrupt();
224  // Wait for it to stop
225  thd.join();
226  }
227 
229  void waitForLastItem() {
230  //cout << "WRITE: supply last = " << supply->getLastSequence() << ", lasSeqProcessed = " << lastSeqProcessed <<
231  //" supply->getLast > lastSeq = " << (supply->getLastSequence() > lastSeqProcessed) << endl;
232  while (supply->getLastSequence() > lastSeqProcessed.load()) {
233  std::this_thread::sleep_for(std::chrono::milliseconds(1));
234  }
235 
236  // Stop this thread, not the calling thread
237  stopThread();
238  }
239 
252  void setForcedRecordId(uint64_t id) {
253  forcedRecordId = id;
254  forceToDisk = true;
255  }
256 
257  std::shared_ptr<RecordRingItem> storeRecordCopy(std::shared_ptr<RecordRingItem> & rec) {
258  // Call copy constructor of RecordRingItem, then make into shared pointer
259  storedItem = std::make_shared<RecordRingItem>(*(rec.get()));
260  return storedItem;
261  }
262 
264  void run() {
265 
266  try {
267 
268  while (true) {
269 
270  // Get the next record for this thread to write
271  // shared_ptr<RecordRingItem>
272  auto item = supply->getToWrite();
273 
274  {
275  // Only allow interruption when blocked on trying to get item
276  boost::this_thread::disable_interruption d1;
277 
278  int64_t currentSeq = item->getSequence();
279  // Pull record out of wrapping object
280  auto record = item->getRecord();
281 
282  // Only need to check the disk when writing the first record following
283  // a file split. That first write will create the file. If there isn't
284  // enough room, then flag will be set.
285  bool checkDisk = item->isCheckDisk();
286 
287  // Check the disk before we try to write if about to create another file,
288  // we're told to check the disk, and we're not forcing to disk
289  if ((writer->bytesWritten < 1) && checkDisk && (!forceToDisk.load())) {
290 
291  // If there isn't enough free space to write the complete, projected
292  // size file, and we're not trying to force the write ...
293  // store a COPY of the record for now and release the original so
294  // that writeEventToFile() does not block.
295  //
296  // So here is the problem. We are stuck in a loop here if disk is full.
297  // If events are flowing and since writing data to file is the bottleneck,
298  // it is likely that all records have been filled and published onto
299  // the ring. AND, writeEventToFile() blocks in a spin as it tries to get the
300  // next record from the ring which, unfortunately, never comes.
301  //
302  // When writeEventToFile() blocks, it can't respond by returning a "false" value.
303  // This plays havoc with code like the emu which is not expecting the write
304  // to block (at least not for very long).
305  //
306  // To break writeEventToFile() out of its spinning block, we make a copy of the
307  // item we're trying to write and release the original record. This allows
308  // writeEventToFile() to grab a new (newly released) record, write an event into
309  // it, and return to the caller. From then on, writeEventToFile() can prevent
310  // blocking by checking for a full disk (which it couldn't do before since
311  // the signal came too late).
312 
313  while (writer->fullDisk() && (!forceToDisk.load())) {
314  // Wait for a sec and try again
315  std::this_thread::sleep_for(std::chrono::seconds(1));
316 
317  // If we released the item in a previous loop, don't do it again
318  if (!item->isAlreadyReleased()) {
319  // Copy item
320  auto copiedItem = storeRecordCopy(item);
321  // Release original so we don't block writeEvent()
322  supply->releaseWriter(item);
323  item = copiedItem;
324  }
325 
326  // Wait until space opens up
327  }
328 
329  // If we're here, there must be free space available even
330  // if there previously wasn't.
331  }
332 
333  // Do write
334  // Write current item to file
335  //cout << "EventWriter: Calling writeToFileMT(item)\n";
336  writer->writeToFileMT(item, forceToDisk.load());
337 
338  // Turn off forced write to disk, if the record which
339  // initially triggered it, has now been written.
340  if (forceToDisk.load() && (forcedRecordId.load() == item->getId())) {
341  //cout << "EventWriter: WROTE the record that triggered force, reset to false\n";
342  forceToDisk = false;
343  }
344 
345  // Now we're done with this sequence
346  lastSeqProcessed = currentSeq;
347 
348  // Split file if needed
349  if (item->splitFileAfterWrite()) {
350  writer->splitFile();
351  }
352 
353  // Release back to supply
354  supply->releaseWriter(item);
355  }
356  }
357  }
358  catch (boost::thread_interrupted & e) {
359  std::cout << "EventWriter: INTERRUPTED, return" << std::endl;
360  }
361  }
362  };
363 
364 
365 
371  class FileCloser {
372 
374  class CloseAsyncFChan {
375 
376  // Store quantities from exterior classes or store quantities that
377  // may change between when this object is created and when this thread is run.
378  std::shared_ptr<std::fstream> afChannel;
379  std::shared_ptr<std::future<void>> future;
380  std::shared_ptr<RecordSupply> supply;
381  std::shared_ptr<RecordRingItem> item;
382  FileHeader fHeader;
383  std::shared_ptr<std::vector<uint32_t>> recLengths;
384  uint64_t bytesWrittenToFile;
385  uint32_t recordNum;
386  bool addTrailer;
387  bool writeIndx;
388  bool noFileWriting;
389  ByteOrder byteOrder;
390 
391  // A couple of things used to clean up after thread is done
392  FileCloser *closer;
393  std::shared_ptr<CloseAsyncFChan> sharedPtrOfMe;
394 
395  // Local storage
396  uint32_t hdrBufferBytes = RecordHeader::HEADER_SIZE_BYTES + 2048;
397  ByteBuffer hdrBuffer{hdrBufferBytes};
398  uint8_t *hdrArray = hdrBuffer.array();
399 
400  // Thread which does the file writing
401  boost::thread thd;
402 
403  public:
404 
406  CloseAsyncFChan(std::shared_ptr<std::fstream> &afc,
407  std::shared_ptr<std::future<void>> &future1,
408  std::shared_ptr<RecordSupply> &supply,
409  std::shared_ptr<RecordRingItem> &ringItem,
410  FileHeader &fileHeader, std::shared_ptr<std::vector<uint32_t>> recordLengths,
411  uint64_t bytesWritten, uint32_t recordNumber,
412  bool addingTrailer, bool writeIndex, bool noWriting,
413  ByteOrder &order, FileCloser *fc) :
414 
415  afChannel(afc), future(future1), supply(supply),
416  item(ringItem), byteOrder(order) {
417 
418  fHeader = fileHeader;
419  recLengths = recordLengths;
420  bytesWrittenToFile = bytesWritten;
421  recordNum = recordNumber;
422  addTrailer = addingTrailer;
423  writeIndx = writeIndex;
424  noFileWriting = noWriting;
425  closer = fc;
426 
427  hdrBuffer.order(order);
428 
429  startThread();
430  }
431 
432 
438  void setSharedPointerOfThis(std::shared_ptr<CloseAsyncFChan> & mySharedPtr) {
439  sharedPtrOfMe = mySharedPtr;
440  }
441 
442 
444  void stopThread() {
445  // Send signal to interrupt it
446  thd.interrupt();
447 
448  // Wait for it to stop
449  if (thd.try_join_for(boost::chrono::milliseconds(500))) {
450  std::cout << "CloseAsyncFChan thread did not quit after 1/2 sec" << std::endl;
451  }
452 
453  try {
454  // When this thread is done, remove itself from vector
455  closer->removeThread(sharedPtrOfMe);
456  }
457  catch (std::exception & e) {}
458  }
459 
460 
461  private:
462 
463 
465  void startThread() {
466  thd = boost::thread([this]() { this->run(); });
467  }
468 
469  void run() {
470  // Finish writing to current file
471  if (future != nullptr) {
472  try {
473  // Wait for last write to end before we continue
474  future->get();
475  }
476  catch (std::exception &e) {}
477  }
478 
479  // Release resources back to the ring
480  std::cout << "Closer: releaseWriterSequential, will release item seq = " << item->getSequence() << std::endl;
481  supply->releaseWriterSequential(item);
482 
483  try {
484  if (addTrailer && !noFileWriting) {
485  writeTrailerToFile();
486  }
487  }
488  catch (std::exception &e) {}
489 
490  try {
491  afChannel->close();
492  }
493  catch (std::exception &e) {
494  std::cout << e.what() << std::endl;
495  }
496 
497  try {
498  // When this thread is done, remove itself from vector
499  closer->removeThread(sharedPtrOfMe);
500  }
501  catch (std::exception & e) {}
502  }
503 
504 
515  void writeTrailerToFile() {
516 
517  // Keep track of where we are right now which is just before trailer
518  uint64_t trailerPosition = bytesWrittenToFile;
519 
520  // If we're NOT adding a record index, just write trailer
521  if (!writeIndx) {
522  try {
523  // hdrBuffer is only used in this method
524  hdrBuffer.position(0).limit(RecordHeader::HEADER_SIZE_BYTES);
525  RecordHeader::writeTrailer(hdrBuffer, 0, recordNum, nullptr);
526  }
527  catch (EvioException &e) {/* never happen */}
528 
529  afChannel->write(reinterpret_cast<char *>(hdrArray),
531  if (afChannel->fail()) {
532  throw EvioException("error writing to file");
533  }
534  }
535  else {
536  // Write trailer with index
537 
538  // How many bytes are we writing here?
539  uint32_t bytesToWrite = RecordHeader::HEADER_SIZE_BYTES + 4*recLengths->size();
540 
541  // Make sure our array can hold everything
542  if (hdrBufferBytes < bytesToWrite) {
543  hdrBuffer = ByteBuffer(bytesToWrite);
544  hdrArray = hdrBuffer.array();
545  }
546  hdrBuffer.limit(bytesToWrite).position(0);
547 
548  // Place data into hdrBuffer - both header and index
549  try {
550  RecordHeader::writeTrailer(hdrBuffer, (size_t)0, recordNum, recLengths);
551  }
552  catch (EvioException &e) {/* never happen */}
553  afChannel->write(reinterpret_cast<char *>(hdrArray), bytesToWrite);
554  if (afChannel->fail()) {
555  throw EvioException("error writing to file");
556  }
557  }
558 
559  // Update file header's trailer position word
560  if (!byteOrder.isLocalEndian()) {
561  trailerPosition = SWAP_64(trailerPosition);
562  }
563 
564  afChannel->seekg(FileHeader::TRAILER_POSITION_OFFSET);
565  afChannel->write(reinterpret_cast<char *>(&trailerPosition), sizeof(trailerPosition));
566  if (afChannel->fail()) {
567  throw EvioException("error writing to file");
568  }
569 
570  // Update file header's bit-info word
571  if (writeIndx) {
572  uint32_t bitInfo = fHeader.setBitInfo(fHeader.hasFirstEvent(),
573  fHeader.hasDictionary(),
574  true);
575  if (!byteOrder.isLocalEndian()) {
576  bitInfo = SWAP_32(bitInfo);
577  }
578  afChannel->seekg(FileHeader::BIT_INFO_OFFSET);
579  afChannel->write(reinterpret_cast<char *>(&bitInfo), sizeof(bitInfo));
580  if (afChannel->fail()) {
581  throw EvioException("error writing to file");
582  }
583  }
584 
585  // Update file header's record count word
586  uint32_t recordCount = recordNum - 1;
587  if (!byteOrder.isLocalEndian()) {
588  recordCount = SWAP_32(recordCount);
589  }
590  afChannel->seekg(FileHeader::RECORD_COUNT_OFFSET);
591  afChannel->write(reinterpret_cast<char *>(&recordCount), sizeof(recordCount));
592  if (afChannel->fail()) {
593  throw EvioException("error writing to file");
594  }
595  }
596  };
597 
598 
599  private:
600 
601 
603  std::vector<std::shared_ptr<CloseAsyncFChan>> threads;
604 
605 
606  public:
607 
608 
610  void close() {
611  for (const std::shared_ptr<CloseAsyncFChan> & thread : threads) {
612  thread->stopThread();
613  }
614  threads.clear();
615  }
616 
617 
622  void removeThread(std::shared_ptr<CloseAsyncFChan> & thread) {
623  // Look for this pointer among the shared pointers
624  threads.erase(std::remove(threads.begin(), threads.end(), thread), threads.end());
625  }
626 
627 
643  void closeAsyncFile( std::shared_ptr<std::fstream> &afc,
644  std::shared_ptr<std::future<void>> &future1,
645  std::shared_ptr<RecordSupply> &supply,
646  std::shared_ptr<RecordRingItem> &ringItem,
647  FileHeader &fileHeader, std::shared_ptr<std::vector<uint32_t>> &recordLengths,
648  uint64_t bytesWritten, uint32_t recordNumber,
649  bool addingTrailer, bool writeIndex, bool noFileWriting,
650  ByteOrder &order) {
651 
652  auto a = std::make_shared<CloseAsyncFChan>(afc, future1, supply, ringItem,
653  fileHeader, recordLengths,
654  bytesWritten, recordNumber,
655  addingTrailer, writeIndex,
656  noFileWriting, order, this);
657 
658  threads.push_back(a);
659  a->setSharedPointerOfThis(a);
660  }
661 
662  ~FileCloser() {
663  close();
664  }
665  };
666 
667 
668  //-------------------------------------------------------------------------------------
669 
670 
671  private:
672 
673 
676  std::shared_ptr<RecordOutput> commonRecord;
677 
679  std::shared_ptr<RecordOutput> currentRecord;
680 
682  std::shared_ptr<RecordRingItem> currentRingItem;
683 
685  std::shared_ptr<RecordSupply> supply;
686 
688  uint32_t maxSupplyBytes = 0;
689 
693 
696  uint32_t compressionFactor;
697 
700  std::shared_ptr<std::vector<uint32_t>> recordLengths;
701 
704  //TODO: DOES THIS NEED TO BE ATOMIC IF MT write????????????????????????????????
705  size_t bytesWritten = 0ULL;
706 
708  bool addingTrailer = true;
709 
711  bool addTrailerIndex = false;
712 
714  std::vector<uint8_t> headerArray;
715 
717  std::vector<RecordCompressor> recordCompressorThreads;
718 
721  std::vector<EventWriter::RecordWriter> recordWriterThread;
722 
724  uint32_t recordsWritten = 0;
725 
728  uint32_t recordNumber = 1;
729 
734  std::string xmlDictionary;
735 
737  std::vector<uint8_t> dictionaryByteArray;
738 
740  std::vector<uint8_t> firstEventByteArray;
741 
743  bool haveFirstEvent = false;
744 
746  bool closed = false;
747 
749  bool toFile = false;
750 
752  bool append = false;
753 
755  bool hasAppendDictionary = false;
756 
763  uint32_t eventsWrittenTotal = 0;
764 
767 
768  //-----------------------
769  // Buffer related members
770  //-----------------------
771 
773  uint32_t sourceId = 0;
774 
776  uint32_t bufferSize = 0;
777 
786  std::shared_ptr<ByteBuffer> buffer;
787 
790  std::shared_ptr<ByteBuffer> usedBuffer;
791 
793  std::vector<std::shared_ptr<ByteBuffer>> internalBuffers;
794 
796  uint32_t commonRecordBytesToBuffer = 0;
797 
800  uint32_t eventsWrittenToBuffer = 0;
801 
802  //-----------------------
803  // File related members
804  //-----------------------
805 
808  bool diskIsFull = false;
809 
813  std::atomic_bool diskIsFullVolatile{false};
814 
815  bool fileOpen = false;
816 
818  uint64_t idCounter = 0ULL;
819 
821  FileHeader fileHeader;
822 
824  FileHeader appendFileHeader;
825 
827  std::string currentFileName;
828 
829 #ifdef USE_FILESYSTEMLIB
830 
831  fs::path currentFilePath;
832 #endif
833 
835  std::shared_ptr<std::future<void>> future1;
836 
839  std::shared_ptr<RecordRingItem> ringItem1;
840 
842  uint32_t futureIndex = 0;
843 
845  std::shared_ptr<std::fstream> asyncFileChannel = nullptr;
846 
848  uint64_t fileWritingPosition = 0ULL;
849 
851  uint32_t splitNumber = 0;
852 
854  uint32_t splitCount = 0;
855 
857  std::string baseFileName;
858 
860  uint32_t specifierCount = 0;
861 
863  uint32_t runNumber = 0;
864 
869  uint64_t split = 0ULL;
870 
875  uint32_t splitIncrement = 0;
876 
878  uint64_t splitEventBytes = 0ULL;
879 
881  uint32_t splitEventCount = 0;
882 
887  uint32_t streamId = 0;
888 
890  uint32_t streamCount = 1;
891 
893  bool singleThreadedCompression = false;
894 
896  bool overWriteOK = false;
897 
900  uint32_t eventsWrittenToFile = 0;
901 
903  bool hasTrailerWithIndex = false;
904 
906  uint32_t userHeaderLength = 0;
907 
909  uint32_t userHeaderPadding = 0;
910 
912  uint32_t indexLength = 0;
913 
916  std::shared_ptr<FileCloser> fileCloser;
917 
918  //-----------------------
923  bool noFileWriting = false;
924  //-----------------------
925 
926 
927  public:
928 
929 
930  explicit EventWriter(std::string & filename,
931  const ByteOrder & byteOrder = ByteOrder::nativeOrder(),
932  bool append = false);
933 
934  EventWriter(std::string & filename,
935  std::string & dictionary,
936  const ByteOrder & byteOrder = ByteOrder::nativeOrder(),
937  bool append = false);
938 
939  EventWriter(std::string baseName, const std::string & directory, const std::string & runType,
940  uint32_t runNumber, uint64_t split, uint32_t maxRecordSize, uint32_t maxEventCount,
941  const ByteOrder & byteOrder, const std::string & xmlDictionary, bool overWriteOK,
942  bool append, std::shared_ptr<EvioBank> firstEvent, uint32_t streamId, uint32_t splitNumber,
943  uint32_t splitIncrement, uint32_t streamCount, Compressor::CompressionType compressionType,
944  uint32_t compressionThreads, uint32_t ringSize, uint32_t bufferSize);
945 
946  //---------------------------------------------
947  // BUFFER Constructors
948  //---------------------------------------------
949 
950  explicit EventWriter(std::shared_ptr<ByteBuffer> & buf);
951  EventWriter(std::shared_ptr<ByteBuffer> & buf, std::string & xmlDictionary);
952  EventWriter(std::shared_ptr<ByteBuffer> & buf, uint32_t maxRecordSize, uint32_t maxEventCount,
953  const std::string & xmlDictionary, uint32_t recordNumber,
954  Compressor::CompressionType compressionType);
955 
956  private:
957 
958  void reInitializeBuffer(std::shared_ptr<ByteBuffer> & buf, const std::bitset<24> *bitInfo,
959  uint32_t recordNumber, bool useCurrentBitInfo);
960 
961  static void staticWriteFunction(EventWriter *pWriter, const char* data, size_t len);
962  static void staticDoNothingFunction(EventWriter *pWriter);
963 
964  public:
965 
966  bool isDiskFull();
967  void setBuffer(std::shared_ptr<ByteBuffer> & buf, std::bitset<24> *bitInfo, uint32_t recNumber);
968  void setBuffer(std::shared_ptr<ByteBuffer> & buf);
969 
970  private:
971 
972  std::shared_ptr<ByteBuffer> getBuffer();
973 
974 
975  public:
976 
977  std::shared_ptr<ByteBuffer> getByteBuffer();
978  void setSourceId(int sId);
979  void setEventType(int type);
980 
981  bool writingToFile() const;
982  bool isClosed() const;
983 
984  std::string getCurrentFilename() const;
985  size_t getBytesWrittenToBuffer() const;
986  std::string getCurrentFilePath() const;
987  uint32_t getSplitNumber() const;
988  uint32_t getSplitCount() const;
989  uint32_t getRecordNumber() const;
990  uint32_t getEventsWritten() const;
991  ByteOrder getByteOrder() const;
992 
993  void setStartingRecordNumber(uint32_t startingRecordNumber);
994 
995  void setFirstEvent(std::shared_ptr<EvioNode> & node);
996  void setFirstEvent(std::shared_ptr<ByteBuffer> & buf);
997  void setFirstEvent(std::shared_ptr<EvioBank> bank);
998 
999  private:
1000 
1001  void createCommonRecord(const std::string & xmlDict,
1002  std::shared_ptr<EvioBank> const & firstBank,
1003  std::shared_ptr<EvioNode> const & firstNode,
1004  std::shared_ptr<ByteBuffer> const & firstBuf);
1005 
1006  void writeFileHeader() ;
1007 
1008 
1009  public :
1010 
1011  void flush();
1012  void close();
1013 
1014  protected:
1015 
1016  void examineFileHeader();
1017 
1018  private:
1019 
1020  void toAppendPosition();
1021 
1022  public:
1023 
1024  bool hasRoom(uint32_t bytes);
1025 
1026  bool writeEvent(std::shared_ptr<EvioNode> & node, bool force);
1027  bool writeEvent(std::shared_ptr<EvioNode> & node, bool force, bool duplicate);
1028  bool writeEventToFile(std::shared_ptr<EvioNode> & node, bool force, bool duplicate);
1029 
1030  bool writeEvent(std::shared_ptr<ByteBuffer> & bankBuffer);
1031  bool writeEvent(std::shared_ptr<ByteBuffer> & bankBuffer, bool force);
1032 
1033  bool writeEvent(std::shared_ptr<EvioBank> bank);
1034  bool writeEvent(std::shared_ptr<EvioBank> bank, bool force);
1035 
1036  private:
1037 
1038  bool writeEvent(std::shared_ptr<EvioBank> bank,
1039  std::shared_ptr<ByteBuffer> bankBuffer, bool force);
1040  bool writeEventToFile(std::shared_ptr<EvioBank> bank,
1041  std::shared_ptr<ByteBuffer> bankBuffer, bool force);
1042 
1043  bool fullDisk();
1044 
1045  void compressAndWriteToFile(bool force);
1046  bool tryCompressAndWriteToFile(bool force);
1047 
1048  bool writeToFile(bool force, bool checkDisk);
1049  void writeToFileMT(std::shared_ptr<RecordRingItem> & item, bool force);
1050 
1051  void splitFile();
1052  void writeTrailerToFile(bool writeIndex);
1053  void flushCurrentRecordToBuffer() ;
1054  bool writeToBuffer(std::shared_ptr<EvioBank> & bank, std::shared_ptr<ByteBuffer> & bankBuffer) ;
1055 
1056  uint32_t trailerBytes();
1057  void writeTrailerToBuffer(bool writeIndex);
1058 
1059  };
1060 
1061 }
1062 
1063 
1064 #endif //EVIO_6_0_EVENTWRITER_H
bool hasRoom(uint32_t bytes)
Is there room to write this many bytes to an output buffer as a single event? Will always return true...
Definition: EventWriter.cpp:1640
uint32_t getRecordNumber() const
Get the current record number.
Definition: EventWriter.cpp:820
This class is copied from one of the same name in the Java programming language.
Definition: ByteBuffer.h:42
static void writeTrailer(uint8_t *array, size_t arrayLen, uint32_t recordNum, const ByteOrder &order, const std::shared_ptr< std::vector< uint32_t >> &recordLengths=nullptr)
std::shared_ptr< ByteBuffer > getByteBuffer()
If writing to a file, return null.
Definition: EventWriter.cpp:698
bool writingToFile() const
Is this object writing to file?
Definition: EventWriter.cpp:752
static const uint32_t TRAILER_POSITION_OFFSET
Byte offset from beginning of header to write trailer position.
Definition: FileHeader.h:130
uint32_t getSplitCount() const
Get the number of split files produced by this writer.
Definition: EventWriter.cpp:812
EventWriter(std::string &filename, const ByteOrder &byteOrder=ByteOrder::nativeOrder(), bool append=false)
Creates an EventWriter for writing to a file in the specified byte order.
Definition: EventWriter.cpp:35
static const uint32_t BIT_INFO_OFFSET
Byte offset from beginning of header to bit info word.
Definition: FileHeader.h:122
CompressionType
Enum of supported data compression types.
Definition: Compressor.h:65
Numerical values associated with endian byte order.
Definition: ByteOrder.h:53
#define SWAP_64(x)
Macro for swapping 64 bit types.
Definition: ByteOrder.h:35
size_t getBytesWrittenToBuffer() const
If writing to a buffer, get the number of bytes written to it including the trailer.
Definition: EventWriter.cpp:777
ByteOrder getByteOrder() const
Get the byte order of the buffer/file being written into.
Definition: EventWriter.cpp:843
uint32_t getSplitNumber() const
Get the current split number which is the split number of file to be written next.
Definition: EventWriter.cpp:805
void setStartingRecordNumber(uint32_t startingRecordNumber)
Set the number with which to start block (record) numbers.
Definition: EventWriter.cpp:865
Exception class for Evio software package.
Definition: EvioException.h:29
Definition: Compressor.h:66
void setSourceId(int sId)
Set the value of the source Id in the first block header.
Definition: EventWriter.cpp:725
An EventWriter object is used for writing events to a file or to a byte buffer.
Definition: EventWriter.h:148
bool isDiskFull()
If writing file, is the partition it resides on full? Not full, in this context, means there&#39;s enough...
Definition: EventWriter.cpp:610
static ByteOrder const & nativeOrder()
Get the byte order of the local host.
Definition: ByteOrder.h:133
void setFirstEvent(std::shared_ptr< EvioNode > &node)
Set an event which will be written to the file as well as to all split files.
Definition: EventWriter.cpp:910
void setBuffer(std::shared_ptr< ByteBuffer > &buf, std::bitset< 24 > *bitInfo, uint32_t recNumber)
Set the buffer being written into (initially set in constructor).
Definition: EventWriter.cpp:629
static const uint32_t RECORD_COUNT_OFFSET
Byte offset from beginning of header to the record count.
Definition: FileHeader.h:118
bool writeEventToFile(std::shared_ptr< EvioNode > &node, bool force, bool duplicate)
Write an event (bank) into a record and eventually to a file in evio/hipo version 6 format...
Definition: EventWriter.cpp:1801
uint32_t getEventsWritten() const
Get the number of events written to a file/buffer.
Definition: EventWriter.cpp:832
std::string getCurrentFilePath() const
Get the full name or path of the current file being written to.
Definition: EventWriter.cpp:785
bool writeEvent(std::shared_ptr< EvioNode > &node, bool force)
Write an event (bank) into a record in evio/hipo version 6 format.
Definition: EventWriter.cpp:1686
Definition: FileHeader.h:91
#define SWAP_32(x)
Macro for swapping 32 bit types.
Definition: ByteOrder.h:28
std::string getCurrentFilename() const
Get the name of the current file being written to.
Definition: EventWriter.cpp:769
void close()
This method flushes any remaining data to file and disables this object.
Definition: EventWriter.cpp:1231
void setEventType(int type)
Set the bit info of a record header for a specified CODA event type.
Definition: EventWriter.cpp:742
uint8_t * array() const
Get a pointer to this buffer&#39;s backing array which contains the data.
Definition: ByteBuffer.cpp:475
void flush()
This method flushes any remaining internally buffered data to file.
Definition: EventWriter.cpp:1191
bool isClosed() const
Has close() been called (without reopening by calling setBuffer()) ?
Definition: EventWriter.cpp:761
void examineFileHeader()
Reads part of the file header in order to determine the evio version # and endianness of the file in ...
Definition: EventWriter.cpp:1339
static const uint32_t HEADER_SIZE_BYTES
Number of bytes in a normal sized header.
Definition: RecordHeader.h:194
static const ByteOrder ENDIAN_LOCAL
Local host&#39;s byte order.
Definition: ByteOrder.h:61