evio  6.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
RecordCompressor.h
Go to the documentation of this file.
1 //
2 // Copyright 2020, Jefferson Science Associates, LLC.
3 // Subject to the terms in the LICENSE file found in the top-level directory.
4 //
5 // EPSCI Group
6 // Thomas Jefferson National Accelerator Facility
7 // 12000, Jefferson Ave, Newport News, VA 23606
8 // (757)-269-7100
9 
10 
11 #ifndef EVIO_6_0_RECORDCOMPRESSOR_H
12 #define EVIO_6_0_RECORDCOMPRESSOR_H
13 
14 
15 #include <iostream>
16 #include <iomanip>
17 #include <string>
18 #include <thread>
19 #include <memory>
20 
21 
22 #include "RecordOutput.h"
23 #include "RecordHeader.h"
24 #include "Compressor.h"
25 #include "RecordSupply.h"
26 
27 
28 #include "Disruptor/Util.h"
29 #include <boost/thread.hpp>
30 
31 
32 namespace evio {
33 
34 
45 
46  private:
47 
49  uint32_t threadNumber;
51  Compressor::CompressionType compressionType;
53  std::shared_ptr<RecordSupply> supply;
55  boost::thread thd;
56 
57  public:
58 
66  std::shared_ptr<RecordSupply> & recordSupply) :
67  threadNumber(thdNum),
68  compressionType(type),
69  supply(recordSupply) {
70  }
71 
73  threadNumber(obj.threadNumber),
74  compressionType(obj.compressionType),
75  supply(std::move(obj.supply)),
76  thd(std::move(obj.thd)) {
77  }
78 
80  if (this != &obj) {
81  threadNumber = obj.threadNumber;
82  compressionType = obj.compressionType;
83  supply = std::move(obj.supply);
84  thd = std::move(obj.thd);
85  }
86  return *this;
87  }
88 
90  thd.interrupt();
91  if (thd.try_join_for(boost::chrono::milliseconds(500))) {
92  std::cout << "RecordCompressor thread did not quit after 1/2 sec" << std::endl;
93  }
94  }
95 
97  void startThread() {
98  thd = boost::thread([this]() {this->run();});
99  }
100 
102  void stopThread() {
103  // Send signal to interrupt it
104  thd.interrupt();
105  // Wait for it to stop
106  thd.join();
107  }
108 
110  void run() {
111 
112  try {
113 
114  // The first time through, we need to release all records coming before
115  // our first in case there are < threadNumber records before close() is called.
116  // This way close() is not waiting for thread #12 to get and subsequently
117  // release items 0 - 11 when there were only 5 records total.
118  // (threadNumber starts at 0).
119 
120  // Be careful when dealing with negative numbers and unsigned ints ...
121  int64_t seqNumber = (int64_t)threadNumber - 1;
122  supply->release(threadNumber, seqNumber);
123 
124  while (true) {
125 
126  // Get the next record for this thread to compress
127  auto item = supply->getToCompress(threadNumber);
128 
129  {
130  // Only allow interruption when blocked on trying to get item
131  boost::this_thread::disable_interruption d1;
132 
133  // Pull record out of wrapping object
134  std::shared_ptr<RecordOutput> & record = item->getRecord();
135  // Set compression type
136  auto & header = record->getHeader();
137  header->setCompressionType(compressionType);
138 //cout << "RecordCompressor thd " << threadNumber << ": got record, set rec # to " << header->getRecordNumber() << endl;
139  // Do compression
140  record->build();
141  // Release back to supply
142  supply->releaseCompressor(item);
143  }
144  }
145  }
146  catch (boost::thread_interrupted & e) {
147 //cout << "RecordCompressor thd " << threadNumber << ": INTERRUPTED, return" << endl;
148  }
149  }
150  };
151 
152 
153 }
154 
155 
156 #endif //EVIO_6_0_RECORDCOMPRESSOR_H
void run()
Method to run in the thread.
Definition: RecordCompressor.h:110
CompressionType
Enum of supported data compression types.
Definition: Compressor.h:65
Class used to create a thread which takes data-filled records from a RingBuffer-backed RecordSupply...
Definition: RecordCompressor.h:44
~RecordCompressor()
Definition: RecordCompressor.h:89
RecordCompressor(uint32_t thdNum, Compressor::CompressionType &type, std::shared_ptr< RecordSupply > &recordSupply)
Constructor.
Definition: RecordCompressor.h:65
RecordCompressor(RecordCompressor &&obj) noexcept
Definition: RecordCompressor.h:72
void startThread()
Create and start a thread to execute the run() method of this class.
Definition: RecordCompressor.h:97
RecordCompressor & operator=(RecordCompressor &&obj) noexcept
Definition: RecordCompressor.h:79
void stopThread()
Stop the thread.
Definition: RecordCompressor.h:102