runMACS
 All Data Structures Files Functions Variables Enumerations Enumerator Macros
DummyGrabber.cpp
Go to the documentation of this file.
1 #include <DummyGrabber.h>
2 
3 #include <string>
4 #include <iostream>
5 #include <mutex>
6 #include <chrono>
7 #include <functional>
8 
9 #include <RocLogger.h>
10 #include <stringtricks.h>
11 #include <utils.h>
12 #include <GrabberStatsPublisher.h>
13 
14 #include <chrono>
15 #include <climits> // UINT_MAX
16 
17 using namespace std;
18 
20 public:
22  static DummyGrabberSynchronizer instance;
23  return instance;
24  }
25  int getCurrentFrame(double originalFrameRate, int originalNumberOfFrames) {
26  if(originalFrameRate == 0.) {
27  std::cout << "WARNING: original FPS is 0! assuming FPS == 20!" << std::endl;
28  originalFrameRate = 20.;
29  }
30  std::chrono::milliseconds sensorDuration((long long)(((long long)originalNumberOfFrames * 1000LL) / originalFrameRate));
31  if(sensorDuration < imagePeriod) {
32  imagePeriod = sensorDuration;
33  }
34  auto currentPosition = (std::chrono::system_clock::now() - refTime) % imagePeriod;
35  //int frameNo = (originalNumberOfFrames * currentPosition) / sensorDuration;
36  return (std::chrono::duration_cast<std::chrono::microseconds>(currentPosition).count() * originalFrameRate) / 1000000LL;
37  }
38 private:
40  refTime(std::chrono::system_clock::now()) {};
42  void operator=(DummyGrabberSynchronizer const &);
43 
44  std::chrono::system_clock::time_point refTime;
45  std::chrono::system_clock::duration imagePeriod = std::chrono::system_clock::duration::max();
46 };
47 
48 template <typename T> void recalculateExposure(T * target,
49  T * source,
50  int width,
51  int height,
52  double factor,
53  T dark,
54  T max);
55 
56 DummyException::DummyException(const std::string & _message) {
57  message = _message;
58 }
59 const char* DummyException::what() const throw() {
60  return message.c_str();
61 }
62 
64 
65 }
66 
67 _DummyGrabberImpl::_DummyGrabberImpl(const std::string & _deviceName,
68  const std::string & _deviceSource,
69  zmq::context_t & _ctx,
70  const std::string & _brokerEndpoint) :
71  RocServer(_ctx, _brokerEndpoint, _deviceName),
72  ctx(_ctx),
73  deviceName(_deviceName),
74  deviceSource(_deviceSource),
75  bufferUsers(0),
76  openBuffers(0),
77  totalBufferUsers(0),
78  shutterClosed(false),
79  fps(20.),
80  tint(10.),
81  rocsHeader(SSTR("ROCS\x01", _deviceName, ".controls")),
82  rocsSocket(_ctx, ZMQ_PUB),
83  synchronizer(DummyGrabberSynchronizer::getInstance()){
84  // load dummy file
85  dummyFile.openFile(deviceSource);
86  /* dummyFile will be closed and unmapped by its destructor */
87  if(deviceName == "vnir") {
88  maxValue = (2<<11)-1;
89  } else if(deviceName == "swir") {
90  maxValue = (2<<13)-1;
91  } else if(deviceName == "lwir") {
92  maxValue = (2<<13)-1;
93  } else {
94  maxValue = (2<<7)-1;
95  }
96 }
97 
98 _DummyGrabberImpl::~_DummyGrabberImpl() {
99 
100 }
101 
102 int _DummyGrabberImpl::getWidth() {
103  return dummyFile.getWidth();
104 }
105 
106 int _DummyGrabberImpl::getHeight() {
107  return dummyFile.getHeight();
108 }
109 
110 int _DummyGrabberImpl::getBytesPerPixel() {
111  return dummyFile.getBytesPerPixel();
112 }
113 
114 void _DummyGrabberImpl::startServing() {
115  enableAcquisition = true;
116  acquisitionThread = std::thread(&_DummyGrabberImpl::acquisition, this);
117 }
118 
119 void _DummyGrabberImpl::stopServing() {
120  enableAcquisition = false;
121  if(acquisitionThread.joinable()) {
122  std::cout << "wait for grabber thread to join" << std::endl;
123  acquisitionThread.join();
124  }
125 }
126 
127 void _DummyGrabberImpl::acquisition() {
128  RocLogger logger(ctx, "inproc://log", "camserver", SSTR("grabber.", deviceName));
129  rocsSocket.connect("inproc://log");
130 
131  zmq::socket_t pubSocket(ctx, ZMQ_PUB);
132  for(const auto & ep: endpoints) {
133  pubSocket.bind(ep.c_str());
134  }
135  int width = getWidth();
136  int height = getHeight();
137  int bytesPerPixel = getBytesPerPixel();
138  std::cout << "width: " << width << " height: " << height << " bpp: " << bytesPerPixel << std::endl;
139  int bufferCount = 256;
140  char * bufferList[bufferCount];
141  bufferUsers.resize(bufferCount);
142 
143  GrabberStatsPublisher statsPublisher(ctx, deviceName);
144 
145  for(int i = 0; i < bufferCount; ++i) {
146  bufferList[i] = new char[width * height * bytesPerPixel];
147  assert(bufferList[i] != nullptr);
148  bufferUsers[i] = new std::atomic<int>(0);
149  assert(bufferUsers[i] != nullptr);
150  }
151 #ifdef DEBUG
152  std::cout << "grab initialized" << std::endl;
153 #endif
154  logger.debug(SSTR("grab initialized"));
155 
156  int numFrames = dummyFile.getNumFrames();
157  double originalFps = dummyFile.getFps();
158  auto t1 = std::chrono::high_resolution_clock::now();
159  auto t2 = std::chrono::high_resolution_clock::now();
160  /* do work here */
161  while(enableAcquisition) {
162  statsPublisher.publishBufferLoad(openBuffers, bufferCount);
163  sendStatus();
164 
165  int currentFrame = synchronizer.getCurrentFrame(originalFps, numFrames);
166  int currentBufferIndex = 0;
167  /* find a free buffer */
168  while(currentBufferIndex < bufferCount && *bufferUsers[currentBufferIndex] > 0) {
169  ++currentBufferIndex;
170  }
171  if(currentBufferIndex >= bufferCount) {
172  throw DummyException("out of buffers!");
173  }
174  t2 = std::chrono::high_resolution_clock::now();
175  auto dt = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1);
176  // adjust framerate
177  std::this_thread::sleep_for(std::chrono::microseconds(int(1e6/fps)) - dt);
178 #ifdef DEBUG
179  std::cout << "dt: " << dt.count() << " ms" << std::endl;
180 #endif
181  auto grabTime = std::chrono::system_clock::now(); /* wall time */
182  t1 = std::chrono::high_resolution_clock::now();
183  /* times are serialized as microseconds since the epoch */
184  uint64_t grabHighResTime = std::chrono::duration_cast<std::chrono::microseconds>(t1.time_since_epoch()).count();
185  uint64_t grabWallTime = std::chrono::duration_cast<std::chrono::microseconds>(grabTime.time_since_epoch()).count();
186 
187  acquireBuffer(currentBufferIndex); /* will be released by zmq-callback */
188 
189  double dfTint = dummyFile.getTint();
190  double tintFactor = 1.0;
191  if(dfTint != 0.) {
192  tintFactor = tint / dfTint;
193  }
194  if(shutterClosed) {
195  tintFactor = 0.;
196  }
197 
198  switch(bytesPerPixel) {
199  case 1:
200  recalculateExposure<uint8_t>((uint8_t*) bufferList[currentBufferIndex],
201  (uint8_t*) dummyFile.getDataByFrame(currentFrame),
202  width,
203  height,
204  tintFactor,
205  20,
206  maxValue);
207  break;
208  case 2:
209  recalculateExposure<uint16_t>((uint16_t*) bufferList[currentBufferIndex],
210  (uint16_t*) dummyFile.getDataByFrame(currentFrame),
211  width,
212  height,
213  tintFactor,
214  270,
215  maxValue);
216  break;
217  case 4:
218  recalculateExposure<uint32_t>((uint32_t*) bufferList[currentBufferIndex],
219  (uint32_t*) dummyFile.getDataByFrame(currentFrame),
220  width,
221  height,
222  tintFactor,
223  270,
224  maxValue);
225  break;
226  }
227  /* compose multipart message to extractor workers */
228  zmq::message_t header(deviceName.length());
229  memcpy(header.data(), deviceName.c_str(), deviceName.length());
230  pubSocket.send(header, ZMQ_SNDMORE);
231  zmq::message_t dimensions(3 * 4 + 2 * 8);
232  insertNetworkUInt32(((char*)dimensions.data()) + 0, width);
233  insertNetworkUInt32(((char*)dimensions.data()) + 4, height);
234  insertNetworkUInt32(((char*)dimensions.data()) + 8, bytesPerPixel);
235  insertNetworkUInt64(((char*)dimensions.data()) + 12, grabHighResTime);
236  insertNetworkUInt64(((char*)dimensions.data()) + 20, grabWallTime);
237  pubSocket.send(dimensions, ZMQ_SNDMORE);
238  zmq::message_t data((char*)(bufferList[currentBufferIndex]),
239  width * height * bytesPerPixel,
240  cbWrapper,
241  new std::function<void()>(
242  std::bind(&_DummyGrabberImpl::releaseBuffer,
243  this,
244  currentBufferIndex)
245  ));
246  pubSocket.send(data);
247  }
248  std::cout << "aquisition stopped" << std::endl;
249  /* cleanup */
250  while(totalBufferUsers > 0) {
251  std::mutex m;
252  std::unique_lock<std::mutex> lock(m);
253  bufferFreeNotifier.wait(lock);
254  }
255  for(int i = 0; i < bufferCount; ++i) {
256  delete[] bufferList[i];
257  delete bufferUsers[i];
258  }
259  bufferUsers.clear();
260  logger.debug(SSTR("grabber worker is done"));
261 }
262 
263 void _DummyGrabberImpl::acquireBuffer(int bufferId) {
264  ++totalBufferUsers;
265  if((*bufferUsers[bufferId])++ == 0) {
266 #ifdef DEBUG
267  cout << deviceName << " acquired buffer " << bufferId << endl;
268 #endif
269  ++openBuffers;
270  }
271 }
272 
273 void _DummyGrabberImpl::releaseBuffer(int bufferId) {
274  if(--(*bufferUsers[bufferId]) == 0) {
275 #ifdef DEBUG
276  cout << deviceName << " released buffer " << bufferId << endl;
277 #endif
278  --openBuffers;
279  --totalBufferUsers;
280  bufferFreeNotifier.notify_all();
281  } else {
282  --totalBufferUsers;
283  }
284 }
285 
286 void _DummyGrabberImpl::addEndpoint(const std::string & _endpoint) {
287  endpoints.push_back(_endpoint);
288 }
289 
290 /* RocServer stuff */
291 
292 Json::Value _DummyGrabberImpl::call(const std::string & name, const Json::Value & arguments) {
293  if(name == "openShutter") {
294  shutterClosed = false;
295  return Json::Value::null;
296  } else if(name == "closeShutter") {
297  shutterClosed = true;
298  return Json::Value::null;
299  }
300  throw RocNotImplementedError();
301 }
302 
303 Json::Value _DummyGrabberImpl::getattr(const std::string & name) {
304  if (name == "fps") {
305  return fps;
306  } else if (name == "exposureTime") {
307  return tint;
308  }
309  throw RocIsCallable();
310 }
311 
312 void _DummyGrabberImpl::setattr(const std::string & name, const Json::Value & value) {
313  if (name == "fps") {
314  fps = value.asFloat();
315  return;
316  } else if (name == "exposureTime") {
317  tint = value.asFloat();
318  return;
319  }
320  throw RocNotImplementedError();
321 }
322 
323 /* ROCS stuff */
324 
325 void _DummyGrabberImpl::sendStatus() {
326  zmq::message_t msgHdr(rocsHeader.length());
327  memcpy((char*)msgHdr.data(), rocsHeader.c_str(), rocsHeader.length());
328  rocsSocket.send(msgHdr, ZMQ_SNDMORE);
329 
330  Json::FastWriter writer;
331  Json::Value root;
332  if(shutterClosed) {
333  root["shutter"] = "close";
334  } else {
335  root["shutter"] = "open";
336  }
337  root["exposureTime"] = tint;
338  root["fps"] = fps;
339  std::string data = writer.write(root);
340  zmq::message_t msgData(data.length());
341  memcpy((char *)msgData.data(), data.c_str(), data.length());
342  rocsSocket.send(msgData);
343 }
344 
345 template <typename T> void recalculateExposure(T * target,
346  T * source,
347  int width,
348  int height,
349  double factor,
350  T dark,
351  T max) {
352  int size = width * height;
353  double temp;
354  for(int i = 0; i < size; ++i) {
355  temp = source[i] - dark;
356  if(temp < 0.) {
357  temp = 0.;
358  }
359  temp = (temp * factor) + dark;
360  if(temp > max) {
361  target[i] = max;
362  } else {
363  target[i] = temp;
364  }
365  }
366 }
double getTint() const
Definition: Envi.cpp:34
void insertNetworkUInt32(void *buffer, uint32_t value)
Definition: utils.cpp:40
void insertNetworkUInt64(void *buffer, uint64_t value)
Definition: utils.cpp:45
unsigned long long getNumFrames() const
Definition: Envi.cpp:32
STL namespace.
std::string SSTR(Args &&...components)
Creates a temporary string stream for string concatenation.
Definition: stringtricks.h:21
int getCurrentFrame(double originalFrameRate, int originalNumberOfFrames)
void recalculateExposure(T *target, T *source, int width, int height, double factor, T dark, T max)
DummyException(const std::string &_message)
unsigned long long getHeight() const
Definition: Envi.cpp:31
static DummyGrabberSynchronizer & getInstance()
unsigned long long getWidth() const
Definition: Envi.cpp:30
void cbWrapper(void *data, void *hint)
Definition: utils.cpp:33
unsigned int getBytesPerPixel() const
Definition: Envi.cpp:33
double getFps() const
Definition: Envi.cpp:35
virtual const char * what() const