evio  6.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
WriterMT.h
Go to the documentation of this file.
1 //
2 // Copyright 2020, Jefferson Science Associates, LLC.
3 // Subject to the terms in the LICENSE file found in the top-level directory.
4 //
5 // EPSCI Group
6 // Thomas Jefferson National Accelerator Facility
7 // 12000, Jefferson Ave, Newport News, VA 23606
8 // (757)-269-7100
9 
10 
11 #ifndef EVIO_6_0_WRITERMT_H
12 #define EVIO_6_0_WRITERMT_H
13 
14 
15 #include <iostream>
16 #include <iomanip>
17 #include <fstream>
18 #include <vector>
19 #include <string>
20 #include <thread>
21 #include <queue>
22 #include <chrono>
23 #include <memory>
24 #include <atomic>
25 
26 
27 #include "FileHeader.h"
28 #include "ByteBuffer.h"
29 #include "ByteOrder.h"
30 #include "RecordOutput.h"
31 #include "RecordHeader.h"
32 #include "Compressor.h"
33 #include "Writer.h"
34 #include "RecordSupply.h"
35 #include "RecordCompressor.h"
36 #include "Util.h"
37 #include "EvioException.h"
38 
39 
40 #include "Disruptor/Util.h"
41 #include <boost/thread.hpp>
42 #include <boost/chrono.hpp>
43 
44 
45 namespace evio {
46 
47 
65  class WriterMT {
66 
67 
68  private:
69 
75  class RecordWriter {
76 
77  private:
78 
80  WriterMT * writer = nullptr;
82  std::shared_ptr<RecordSupply> supply;
84  boost::thread thd;
86  std::atomic_long lastSeqProcessed{-1};
87 
88  public:
89 
95  RecordWriter(WriterMT * pwriter, std::shared_ptr<RecordSupply> & recordSupply) :
96  writer(pwriter), supply(recordSupply) {
97  }
98 
103  RecordWriter(RecordWriter && obj) noexcept :
104  writer(obj.writer),
105  supply(std::move(obj.supply)),
106  thd(std::move(obj.thd)) {
107 
108  lastSeqProcessed.store(obj.lastSeqProcessed);
109  }
110 
111  RecordWriter & operator=(RecordWriter && obj) noexcept {
112  if (this != &obj) {
113  writer = obj.writer;
114  lastSeqProcessed.store(obj.lastSeqProcessed);
115  supply = std::move(obj.supply);
116  thd = std::move(obj.thd);
117  }
118  return *this;
119  }
120 
121  // Do not free writer!
122  ~RecordWriter() {
123  thd.interrupt();
124  if (thd.try_join_for(boost::chrono::milliseconds(500))) {
125  std::cout << "RecordWriter thread did not quit after 1/2 sec" << std::endl;
126  }
127  }
128 
130  void startThread() {
131  thd = boost::thread([this]() {this->run();});
132  }
133 
135  void stopThread() {
136  // Send signal to interrupt it
137  thd.interrupt();
138  // Wait for it to stop
139  thd.join();
140  }
141 
143  void waitForLastItem() {
144 //std::cout << "WRITE: supply last = " << supply->getLastSequence() << ", lasSeqProcessed = " << lastSeqProcessed <<
145 //" supply->getLast > lastSeq = " << (supply->getLastSequence() > lastSeqProcessed) << std::endl;
146  while (supply->getLastSequence() > lastSeqProcessed.load()) {
147  std::this_thread::sleep_for(std::chrono::milliseconds(1));
148  }
149 
150  // Stop this thread, not the calling thread
151  stopThread();
152  }
153 
155  void run() {
156  int64_t currentSeq;
157 
158  try {
159  while (true) {
160 
161  std::cout << " RecordWriter: try getting record to write" << std::endl;
162  // Get the next record for this thread to write
163  auto item = supply->getToWrite();
164 
165  {
166  // Only allow interruption when blocked on trying to get item
167  boost::this_thread::disable_interruption d1;
168 
169  currentSeq = item->getSequence();
170  // Pull record out of wrapping object
171  std::shared_ptr<RecordOutput> & record = item->getRecord();
172 
173  // Do write
174  auto & header = record->getHeader();
175  int bytesToWrite = header->getLength();
176  // Record length of this record
177  writer->recordLengths->push_back(bytesToWrite);
178  // Followed by events in record
179  writer->recordLengths->push_back(header->getEntries());
180  writer->writerBytesWritten += bytesToWrite;
181 
182  auto buf = record->getBinaryBuffer();
183  std::cout << " RecordWriter: use outFile to write file, buf pos = " << buf->position() <<
184  ", lim = " << buf->limit() << ", bytesToWrite = " << bytesToWrite << std::endl;
185  writer->outFile.write(reinterpret_cast<const char *>(buf->array()), bytesToWrite);
186  if (writer->outFile.fail()) {
187  throw EvioException("failed write to file");
188  }
189 
190  record->reset();
191 
192  // Release back to supply
193  supply->releaseWriter(item);
194 
195  // Now we're done with this sequence
196  lastSeqProcessed = currentSeq;
197  }
198  }
199  }
200  catch (boost::thread_interrupted & e) {
201  //cout << " RecordWriter: INTERRUPTED, return" << endl;
202  }
203  }
204  };
205 
206 
207  private:
208 
209 
211  size_t writerBytesWritten = 0ULL;
213  uint8_t* firstEvent = nullptr;
215  uint32_t firstEventLength = 0;
217  uint32_t maxEventCount = 0;
219  uint32_t maxBufferSize = 0;
221  uint32_t recordNumber = 1;
223  uint32_t compressionThreadCount = 1;
224 
226  std::string fileName = "";
227 
229  std::ofstream outFile;
230 
232  FileHeader fileHeader;
233 
235  std::string dictionary;
236 
238  std::shared_ptr<ByteBuffer> dictionaryFirstEventBuffer;
239 
242 
244  std::shared_ptr<RecordOutput> outputRecord;
245 
247  std::vector<uint8_t> headerArray;
248 
251 
254  std::shared_ptr<std::vector<uint32_t>> recordLengths;
255 
257  std::shared_ptr<RecordSupply> supply;
258 
261  std::vector<RecordWriter> recordWriterThreads;
262 
264  std::vector<RecordCompressor> recordCompressorThreads;
265 
267  std::shared_ptr<RecordRingItem> ringItem;
268 
269 
271  bool addingTrailer = true;
273  bool addTrailerIndex = false;
275  bool closed = false;
277  bool opened = false;
279  bool firstRecordWritten = false;
281  bool haveDictionary = false;
283  bool haveFirstEvent = false;
285  bool haveUserHeader = false;
286 
287 
288  public:
289 
290  WriterMT();
291 
292  WriterMT(const ByteOrder & order, uint32_t maxEventCount, uint32_t maxBufferSize,
293  Compressor::CompressionType compType, uint32_t compressionThreads);
294 
295  explicit WriterMT(
296  const HeaderType & hType,
297  const ByteOrder & order = ByteOrder::ENDIAN_LITTLE,
298  uint32_t maxEventCount = 0,
299  uint32_t maxBufferSize = 0,
300  const std::string & dictionary = "",
301  uint8_t* firstEvent = nullptr,
302  uint32_t firstEventLen = 0,
304  uint32_t compressionThreads = 1,
305  bool addTrailerIndex = false,
306  uint32_t ringSize = 16);
307 
308  explicit WriterMT(const std::string & filename);
309 
310  WriterMT(const std::string & filename, const ByteOrder & order, uint32_t maxEventCount, uint32_t maxBufferSize,
311  Compressor::CompressionType compressionType, uint32_t compressionThreads);
312 
313  ~WriterMT() = default;
314 
316 
317  private:
318 
319  std::shared_ptr<ByteBuffer> createDictionaryRecord();
320  void writeTrailer(bool writeIndex, uint32_t recordNum);
321 
322  public:
323 
324  const ByteOrder & getByteOrder() const;
325 // ByteBuffer & getBuffer();
327 // RecordHeader & getRecordHeader();
328 // RecordOutput & getRecord();
330 
331  bool addTrailer() const;
332  void addTrailer(bool add);
333  bool addTrailerWithIndex();
334  void addTrailerWithIndex(bool addTrailingIndex);
335 
336  void open(const std::string & filename);
337  void open(const std::string & filename, uint8_t* userHdr, uint32_t userLen);
338 
339  std::shared_ptr<ByteBuffer> createHeader(uint8_t* userHdr, uint32_t userLen);
340  std::shared_ptr<ByteBuffer> createHeader(ByteBuffer & userHdr);
341 
342  void writeRecord(RecordOutput & record);
343 
344  // Use internal RecordOutput to write individual events
345 
346  void addEvent(uint8_t* buffer, uint32_t offset, uint32_t length);
347  void addEvent(ByteBuffer & buffer);
348 // void addEvent(EvioBank & bank);
349  void addEvent(EvioNode & node);
350 
351  void reset();
352  void close();
353 
354  };
355 
356 }
357 
358 
359 #endif //EVIO_6_0_WRITERMT_H
const ByteOrder & getByteOrder() const
Get the file&#39;s byte order.
Definition: WriterMT.cpp:178
This class is copied from one of the same name in the Java programming language.
Definition: ByteBuffer.h:42
static const ByteOrder ENDIAN_LITTLE
Little endian byte order.
Definition: ByteOrder.h:57
WriterMT()
Default constructor.
Definition: WriterMT.cpp:22
CompressionType
Enum of supported data compression types.
Definition: Compressor.h:65
Numerical values associated with endian byte order.
Definition: ByteOrder.h:53
This class is for writing Evio/HIPO files only (not buffers).
Definition: WriterMT.h:65
void addEvent(uint8_t *buffer, uint32_t offset, uint32_t length)
Add a byte array to the current internal record.
Definition: WriterMT.cpp:566
Exception class for Evio software package.
Definition: EvioException.h:29
Definition: Compressor.h:66
FileHeader & getFileHeader()
Get the file header.
Definition: WriterMT.cpp:185
Class which handles the creation and use of Evio &amp; HIPO Records.
Definition: RecordOutput.h:105
This class is used to store relevant info about an evio container (bank, segment, or tag segment)...
Definition: EvioNode.h:41
void close()
Close opened file.
Definition: WriterMT.cpp:685
void writeRecord(RecordOutput &record)
Appends the record to the file.
Definition: WriterMT.cpp:524
Compressor::CompressionType getCompressionType()
Get the internal record&#39;s header.
Definition: WriterMT.cpp:205
Definition: FileHeader.h:91
bool addTrailer() const
Does this writer add a trailer to the end of the file/buffer?
Definition: WriterMT.cpp:212
~WriterMT()=default
void reset()
Get this object ready for re-use.
Definition: WriterMT.cpp:666
std::shared_ptr< ByteBuffer > createHeader(uint8_t *userHdr, uint32_t userLen)
Create and return a buffer containing a general file header followed by the user header given in the ...
Definition: WriterMT.cpp:360
bool addTrailerWithIndex()
Does this writer add a trailer with a record index to the end of the file? Or, if writing to a buffer...
Definition: WriterMT.cpp:231
static const ByteOrder ENDIAN_LOCAL
Local host&#39;s byte order.
Definition: ByteOrder.h:61
void open(const std::string &filename)
Open a new file and write file header with no user header.
Definition: WriterMT.cpp:257
Numerical values associated with types of a file or record header.
Definition: HeaderType.h:32