runMACS
 All Data Structures Files Functions Variables Enumerations Enumerator Macros
FileStorageExtractor.cpp
Go to the documentation of this file.
1 #include <FileStorageExtractor.h>
2 #include <utils.h>
3 #include <stringtricks.h>
4 #include <ConfigManager.h>
5 
6 #include <config.h>
7 
8 #include <cstring>
9 #include <cstdio>
10 #ifdef HAVE_WINDOWS
11 #define POPEN _popen
12 #define PCLOSE _pclose
13 #define POPEN_MODE_WRITE "wb"
14 #endif
15 #ifdef HAVE_LINUX
16 #define POPEN popen
17 #define PCLOSE pclose
18 #define POPEN_MODE_WRITE "w"
19 #endif
20 
21 #include <iostream>
22 #include <sstream>
23 
24 using namespace std;
25 
27  const PubSubEndpoint & _source,
28  const std::string & _identifier,
29  StorageManager & _storageManager,
30  int _maxFrames,
31  const std::string & _metaData,
32  int _framesPerFileCycle,
33  bool _cumulative,
34  bool _binning,
35  double _wantedFPS) :
36  Extractor(_ctx, _source, _wantedFPS),
37  s_identifier(_identifier),
38  width(0),
39  height(0),
40  bytesPerPixel(0),
41  framesWritten(0),
42  totalFramesWritten(0),
43  maxFrames(_maxFrames),
44  framesPerFileCycle(_framesPerFileCycle),
45  cumulative(_cumulative),
46  binning(_binning),
47  metaData(_metaData),
48  error(false),
49  accLinear(nullptr),
50  accQuadratic(nullptr),
51  tmpdata(nullptr) {
52  storageManager = &_storageManager;
53 }
54 
56 
57 }
58 
60  return "fileStorage";
61 }
62 
64  return s_identifier;
65 }
66 
67 std::list<std::string> FileStorageExtractor::parameters() {
68  return {filename, metaData, keepGoing?"active":"done", SSTR(0), SSTR(totalFramesWritten)};
69 }
70 
71 /****** PRIVATE *****/
72 
73 void FileStorageExtractor::extractSetup() {
74  openFile();
75 }
76 
77 void FileStorageExtractor::extractImpl(unsigned int _width,
78  unsigned int _height,
79  unsigned int _bytesPerPixel,
80  uint64_t highResTime,
81  uint64_t wallTime,
82  zmq::message_t * sourceImage) {
83  if(binning) {
84  _height /=2;
85  }
86  if(wallTimes.size() == 0) {
87  width = _width;
88  height = _height;
89  bytesPerPixel = _bytesPerPixel;
90  if(binning) {
91  tmpdata = new uint16_t[width*height];
92  }
93  } else {
94  if(width != _width || height != _height || bytesPerPixel != _bytesPerPixel) {
95  cout << "ERROR: image size changed during capture!" << endl;
96  error = true;
97  }
98  }
99  if(!error) {
100  highResTimes.push_back(highResTime);
101  wallTimes.push_back(wallTime);
102  if(!cumulative) {
103  if(binning) {
104  // reduce data
105  uint16_t * data = (uint16_t*)sourceImage->data();
106  for(unsigned int i=0; i<height; ++i) {
107  for(unsigned int j=0; j<width; ++j) {
108  tmpdata[(i*width)+j] = data[((2*i)*width)+j] + data[(((2*i)+1)*width)+j];
109  }
110  }
111  size_t actuallyWritten = fwrite(tmpdata, 1, sourceImage->size()/2, fil);
112  if(actuallyWritten != sourceImage->size()/2) {
113  cerr << "could only write " << actuallyWritten << " bytes of " << sourceImage->size()/2 << " bytes" << endl;
114  perror("WRITE ERROR");
115  }
116  } else {
117  size_t actuallyWritten = fwrite(sourceImage->data(), 1, sourceImage->size(), fil);
118  if(actuallyWritten != sourceImage->size()) {
119  cerr << "could only write " << actuallyWritten << " bytes of " << sourceImage->size() << " bytes" << endl;
120  perror("WRITE ERROR");
121  }
122  }
123  } else {
124  unsigned int i;
125  unsigned int count = width * height;
126  float element;
127  if(accLinear == nullptr) {
128  accLinear = new float[count]();
129  }
130  if(accQuadratic == nullptr) {
131  accQuadratic = new float[count]();
132  }
133  switch(_bytesPerPixel) {
134  case 1:
135  {
136  uint8_t * data = static_cast<uint8_t*>(sourceImage->data());
137  for(i = 0; i < count; ++i) {
138  element = (float)(data[i]);
139  accLinear[i] += element;
140  accQuadratic[i] += element*element;
141  }
142  }
143  break;
144  case 2:
145  {
146  uint16_t * data = static_cast<uint16_t*>(sourceImage->data());
147  for(i = 0; i < count; ++i) {
148  element = (float)(data[i]);
149  accLinear[i] += element;
150  accQuadratic[i] += element*element;
151  }
152  }
153  break;
154  case 4:
155  {
156  uint32_t * data = static_cast<uint32_t*>(sourceImage->data());
157  for(i = 0; i < count; ++i) {
158  element = (float)(data[i]);
159  accLinear[i] += element;
160  accQuadratic[i] += element*element;
161  }
162  }
163  break;
164  default:
165  cout << "ERROR: " << _bytesPerPixel << " bytesPerPixel are not implemented!" << endl;
166  error = true;
167  }
168  }
169  }
170  /* minimal implementation MUST delete sourceImage! */
171  delete sourceImage;
172  ++framesWritten;
173  ++totalFramesWritten;
174  if(maxFrames >= 0 && totalFramesWritten >= maxFrames) {
175  keepGoing = false;
176  return;
177  }
178  if(framesPerFileCycle > 0 && !cumulative && framesWritten >= framesPerFileCycle) {
179  //cycle files
180  closeFile();
181  framesWritten = 0;
182  wallTimes.clear();
183  highResTimes.clear();
184  openFile();
185  }
186 }
187 
188 void FileStorageExtractor::extractTeardown() {
189  if(cumulative && accLinear != nullptr) {
190  size_t size = width * height * sizeof(float);
191  size_t actuallyWritten = fwrite((void*)accLinear, 1, size, fil);
192  if(actuallyWritten != size) {
193  cerr << "could only write " << actuallyWritten << " bytes of " << size << " bytes" << endl;
194  perror("WRITE ERROR");
195  }
196  actuallyWritten = fwrite((void*)accQuadratic, 1, size, fil);
197  if(actuallyWritten != size) {
198  cerr << "could only write " << actuallyWritten << " bytes of " << size << " bytes" << endl;
199  perror("WRITE ERROR");
200  }
201  delete[] accLinear;
202  delete[] accQuadratic;
203  }
204  if(tmpdata != nullptr) delete[] tmpdata;
205  closeFile();
206  {
207  unique_lock<mutex> _(lastHeaderWriterThreadMutex);
208  if(lastHeaderWriterThread != nullptr && lastHeaderWriterThread->joinable()) {
209  lastHeaderWriterThread->join();
210  }
211  }
212 }
213 
214 void FileStorageExtractor::openFile() {
215  std::string storageDir = storageManager->getPreferredStorageLocation();
216  std::string filename_tmp = SSTR(storageDir, "/tmp_recordXXXXXX");
217 #ifdef HAVE_WINDOWS
218  strcpy_s(filename, sizeof(filename), filename_tmp.c_str());
219 #else
220  strcpy(filename, filename_tmp.c_str());
221 #endif
222  int fd = mkstemp(filename);
223 #ifdef HAVE_WINDOWS
224  fil = _fdopen(fd, "wb");
225 #else
226  fil = fdopen(fd, "wb");
227 #endif
228  cout << "opened file " << filename << endl;
229 }
230 
231 void FileStorageExtractor::closeFile() {
232  cout << "start closing " << fil << endl;
233  fclose(fil);
234  fil = nullptr;
235  cout << "closed file " << filename << endl;
236  if(error) {
237  cout << "there was an error during file creation, removing " << filename << "!" << endl;
238  remove(filename);
239  } else if(framesWritten == 0) {
240  cout << "no frames written in current cycle, removing " << filename << "!" << endl;
241  remove(filename);
242  } else {
243  stringstream sswalltimes;
244  bool first = true;
245  for(const auto & t: wallTimes) {
246  if(first) {
247  sswalltimes << t;
248  first = false;
249  } else {
250  sswalltimes << "," << t;
251  }
252  }
253  stringstream sshighrestimes;
254  first = true;
255  for(const auto & t: highResTimes) {
256  if(first) {
257  sshighrestimes << t;
258  first = false;
259  } else {
260  sshighrestimes << "," << t;
261  }
262  }
263  FILE * pPipe;
264  if((pPipe = POPEN("python -m runmacs.spec.io.streamed_header", POPEN_MODE_WRITE)) != nullptr) {
265  fwriteString(pPipe, "{");
266  fwriteString(pPipe, SSTR("\"original_raw_filename\":\"", filename, "\","));
267  fwriteString(pPipe, SSTR("\"source_name\":\"", source.topic, "\","));
268  fwriteString(pPipe, "\"sw_vers\":\"" DAQ_SW_VERSION "\",");
269  fwriteString(pPipe, SSTR("\"wall_times\":[", sswalltimes.str(), "],"));
270  fwriteString(pPipe, SSTR("\"highRes_times\":[", sshighrestimes.str(), "],"));
271  fwriteString(pPipe, SSTR("\"specbin\":", (binning?"2":"1"), ","));
272  fwriteString(pPipe, "\"spatbin\":1,");
273  fwriteString(pPipe, SSTR("\"width\":", (int)width, ","));
274  fwriteString(pPipe, SSTR("\"height\":", (int)height*(binning?2:1), ","));
275  if(cumulative) {
276  fwriteString(pPipe, "\"format\":\"f\",");
277  fwriteString(pPipe, "\"cumulative\":true,");
278  } else {
279  fwriteString(pPipe, "\"format\":\"H\",");
280  fwriteString(pPipe, "\"cumulative\":false,");
281  }
282  fwriteString(pPipe, SSTR("\"sensorControlServer\":\"", CONFIGMANAGER["daq_broker"].asString(), "\","));
283  fwriteString(pPipe, SSTR("\"metaData\":", metaData, ""));
284  fwriteString(pPipe, "}");
285  /* headerwriter should be closed asynchronously to allow for the next file to be written */
286  {
287  unique_lock<mutex> _(lastHeaderWriterThreadMutex);
288  thread * prevHeaderWriterThread = lastHeaderWriterThread;
289  lastHeaderWriterThread = new thread([=](){
290  (void) PCLOSE(pPipe);
291  if(prevHeaderWriterThread != nullptr && prevHeaderWriterThread->joinable()) {
292  prevHeaderWriterThread->join();
293  }
294  });
295  }
296  //int returnCode = PCLOSE(pPipe);
297  } else {
298  cout << "could not call header-writer" << endl;
299  }
300  }
301 }
#define CONFIGMANAGER
Definition: ConfigManager.h:9
std::string getPreferredStorageLocation()
Ask for a good location to store data.
Base class for an image extractor.
Definition: Extractor.h:17
STL namespace.
PubSubEndpoint source
Definition: Extractor.h:28
#define DAQ_SW_VERSION
Definition: config.h:5
Handles storage locations.
size_t fwriteString(FILE *stream, const std::string &str)
Definition: utils.cpp:116
std::string SSTR(Args &&...components)
Creates a temporary string stream for string concatenation.
Definition: stringtricks.h:21
FileStorageExtractor(zmq::context_t &_ctx, const PubSubEndpoint &_source, const std::string &_identifier, StorageManager &_storageManager, int _maxFrames=-1, const std::string &_metaData="{}", int _framesPerFileCycle=-1, bool _cumulative=false, bool _binning=false, double _wantedFPS=0)
std::list< std::string > parameters()
std::string topic
Definition: utils.h:16
volatile bool keepGoing
Definition: Extractor.h:29