runMACS
 All Data Structures Files Functions Variables Enumerations Enumerator Macros
ImaqGrabber.cpp
Go to the documentation of this file.
1 #include <ImaqGrabber.h>
2 
3 #include <utils.h>
4 #include <stringtricks.h>
5 #include <RocLogger.h>
7 
8 /* c++11 std lib additions */
9 #include <unordered_set>
10 #include <functional>
11 #include <chrono>
12 
13 /* for development */
14 #include <iostream>
15 
16 //#define DEBUG
17 
18 using namespace std;
19 
20 using std::chrono::high_resolution_clock;
21 using std::chrono::milliseconds;
22 
23 ImaqException::ImaqException(const std::string & _message) {
24  message = _message;
25 }
26 const char* ImaqException::what() const throw() {
27  return message.c_str();
28 }
29 
31 
32 }
33 
34 static Int32 handleImaqError(Int32 err) {
35  if(err != 0) {
36  char temp[256];
37  imgShowError(err, temp);
38  throw ImaqException(temp);
39  }
40  return err;
41 }
42 
43 _ImaqGrabberImpl::_ImaqGrabberImpl(const std::string & _deviceName, const std::string & _deviceSource, zmq::context_t & _ctx):
44  deviceName(_deviceName),
45  deviceSource(_deviceSource),
46  ifid(0),
47  sid(0),
48  interfaceOpen(false),
49  sessionOpen(false),
50  openBuffers(0),
51  totalBufferUsers(0),
52  ctx(_ctx) {
53  handleImaqError(imgInterfaceOpen(deviceSource.c_str(), &ifid));
54  interfaceOpen = true;
55  handleImaqError(imgInterfaceReset(ifid));
56  handleImaqError(imgSessionOpen(ifid, &sid));
57  sessionOpen = true;
58  (void) getWidth(); /* dummy request to ensure that communication is working */
59 }
60 _ImaqGrabberImpl::~_ImaqGrabberImpl() {
61  if(sessionOpen) {
62  imgClose(sid, 1);
63  sessionOpen = false;
64  }
65  if(interfaceOpen) {
66  imgClose(ifid, 1);
67  interfaceOpen = false;
68  }
69 }
70 int _ImaqGrabberImpl::getInteger(uInt32 attr) {
71  Int32 temp;
72  handleImaqError(imgGetAttribute(sid, attr, &temp));
73  return temp;
74 }
75 int _ImaqGrabberImpl::getWidth() {
76  return getInteger(IMG_ATTR_ROI_WIDTH);
77 }
78 int _ImaqGrabberImpl::getHeight() {
79  return getInteger(IMG_ATTR_ROI_HEIGHT);
80 }
81 int _ImaqGrabberImpl::getBytesPerPixel() {
82  return getInteger(IMG_ATTR_BYTESPERPIXEL);
83 }
84 
85 void _ImaqGrabberImpl::startServing() {
86  enableAcquisition = true;
87  acquisitionThread = thread(&_ImaqGrabberImpl::acquisition, this);
88  setThreadPriority(acquisitionThread, REALTIME);
89 }
90 
91 void _ImaqGrabberImpl::stopServing() {
92  enableAcquisition = false;
93  if(acquisitionThread.joinable()) {
94  cout << "wait for grabber thread to join" << endl;
95  acquisitionThread.join();
96  }
97 }
98 
99 void _ImaqGrabberImpl::addEndpoint(const string & _endpoint) {
100  endpoints.push_back(_endpoint);
101 }
102 
103 
104 /***** INTERNAL *****/
105 
106 #define IMAQ_BUFFER_COUNT (256)
107 
108 void _ImaqGrabberImpl::acquisition() {
109  RocLogger logger(ctx, "inproc://log", "camserver", SSTR("grabber.", deviceName));
110 
111  zmq::socket_t pubSocket(ctx, ZMQ_PUB);
112  for(const auto & ep: endpoints) {
113  pubSocket.bind(ep.c_str());
114  }
115  int width = getWidth();
116  int height = getHeight();
117  int bytesPerPixel = getBytesPerPixel();
118  char * bufferList[IMAQ_BUFFER_COUNT];
119  bufferUsers.resize(IMAQ_BUFFER_COUNT);
120 
121  GrabberStatsPublisher statsPublisher(ctx, deviceName);
122 
123  for(int i = 0; i < IMAQ_BUFFER_COUNT; ++i) {
124  bufferList[i] = new char[width * height * bytesPerPixel];
125  assert(bufferList[i] != nullptr);
126  bufferUsers[i] = new atomic<int>(0);
127  assert(bufferUsers[i] != nullptr);
128  }
129  try {
130  handleImaqError(imgGrabSetup(sid, 0 /*startNow*/));
131 #ifdef DEBUG
132  cout << "grab initialized" << endl;
133 #endif
134  logger.debug(SSTR("grab initialized"));
135  handleImaqError(imgSessionStartAcquisition(sid));
136 
137  int currentBuffer = 0;
138  auto t1 = chrono::high_resolution_clock::now();
139  auto t2 = chrono::high_resolution_clock::now();
140  /* do work here */
141  while(enableAcquisition) {
142  statsPublisher.publishBufferLoad(openBuffers, IMAQ_BUFFER_COUNT);
143  int currentBufferIndex = 0;
144  /* find a free buffer */
145  while(currentBufferIndex < IMAQ_BUFFER_COUNT && *bufferUsers[currentBufferIndex] > 0) {
146  ++currentBufferIndex;
147  }
148  if(currentBufferIndex >= IMAQ_BUFFER_COUNT) {
149  throw ImaqException("out of buffers!");
150  }
151 #ifdef DEBUG
152  cout << "use buffer: " << currentBufferIndex << endl;
153 #endif
154  Int32 res;
155  res = imgGrab(sid, (void**)&bufferList[currentBufferIndex], 1/* waitForNext */);
156  t2 = chrono::high_resolution_clock::now();
157  auto grabTime = chrono::system_clock::now(); /* wall time */
158 #ifdef DEBUG
159  milliseconds dt = chrono::duration_cast<milliseconds>(t2 - t1);
160  cout << "dt: " << dt.count() << " ms" << endl;
161 #endif
162  t1 = t2;
163  if(res == 0) {
164  int lostFrames;
165 
166  /* times are serialized as microseconds since the epoch */
167  uint64_t grabHighResTime = chrono::duration_cast<chrono::microseconds>(t1.time_since_epoch()).count();
168  uint64_t grabWallTime = chrono::duration_cast<chrono::microseconds>(grabTime.time_since_epoch()).count();
169 #ifdef DEBUG
170  cout << "current buffer: " << currentBuffer << " idx: " << currentBufferIndex << endl;
171 #endif
172  lostFrames = getInteger(IMG_ATTR_LOST_FRAMES);
173  if(lostFrames > totalLostFrames) {
174  cout << "buffer overrun! " << (lostFrames - totalLostFrames) << " lost frames!! (" << lostFrames << " in session)" << endl;
175  totalLostFrames = lostFrames;
176  }
177  acquireBuffer(currentBufferIndex); /* will be released by zmq-callback */
178  /* compose multipart message to extractor workers */
179  zmq::message_t header(deviceName.length());
180  memcpy(header.data(), deviceName.c_str(), deviceName.length());
181  pubSocket.send(header, ZMQ_SNDMORE);
182  zmq::message_t dimensions(3 * 4 + 2 * 8);
183  insertNetworkUInt32(((char*)dimensions.data()) + 0, width);
184  insertNetworkUInt32(((char*)dimensions.data()) + 4, height);
185  insertNetworkUInt32(((char*)dimensions.data()) + 8, bytesPerPixel);
186  insertNetworkUInt64(((char*)dimensions.data()) + 12, grabHighResTime);
187  insertNetworkUInt64(((char*)dimensions.data()) + 20, grabWallTime);
188  pubSocket.send(dimensions, ZMQ_SNDMORE);
189  zmq::message_t data(bufferList[currentBufferIndex],
190  width * height * bytesPerPixel,
191  cbWrapper,
192  new std::function<void()>(
193  bind(&_ImaqGrabberImpl::releaseBuffer,
194  this,
195  currentBufferIndex)
196  ));
197  pubSocket.send(data);
198  /* message is complete */
199  } else if(res == (Int32)(IMG_ERR_TIMEOUT)) {
200  continue;
201  } else {
202  handleImaqError(res);
203  }
204  ++currentBuffer;
205  }
206  cout << "aquisition stopped" << endl;
207 
208  } catch(ImaqException &e) {
209  logger.error(SSTR("grabbing service canceled! (", e.what(), ")"));
210  }
211  /* cleanup */
212  while(totalBufferUsers > 0) {
213  mutex m;
214  unique_lock<mutex> lock(m);
215  bufferFreeNotifier.wait(lock);
216  }
217  handleImaqError(imgSessionStopAcquisition(sid));
218  for(int i = 0; i < IMAQ_BUFFER_COUNT; ++i) {
219  delete[] bufferList[i];
220  delete bufferUsers[i];
221  }
222  bufferUsers.clear();
223  logger.debug(SSTR("grabber worker is done"));
224 }
225 
226 void _ImaqGrabberImpl::acquireBuffer(int bufferId) {
227  ++totalBufferUsers;
228  if((*bufferUsers[bufferId])++ == 0) {
229 #ifdef DEBUG
230  cout << deviceName << " acquired buffer " << bufferId << endl;
231 #endif
232  ++openBuffers;
233  }
234 }
235 
236 void _ImaqGrabberImpl::releaseBuffer(int bufferId) {
237  if(--(*bufferUsers[bufferId]) == 0) {
238 #ifdef DEBUG
239  cout << deviceName << " released buffer " << bufferId << endl;
240 #endif
241  --openBuffers;
242  --totalBufferUsers;
243  bufferFreeNotifier.notify_all();
244  } else {
245  --totalBufferUsers;
246  }
247 }
248 
#define IMAQ_BUFFER_COUNT
Base for all Imaq related exceptions.
Definition: ImaqGrabber.h:29
void insertNetworkUInt32(void *buffer, uint32_t value)
Definition: utils.cpp:40
void publishBufferLoad(const int usedBuffers, const int totalBuffers)
void insertNetworkUInt64(void *buffer, uint64_t value)
Definition: utils.cpp:45
void error(const std::string &_msg)
Definition: RocLogger.cpp:36
void setThreadPriority(std::thread &th, thread_priority_t prio)
Definition: utils.cpp:62
STL namespace.
void debug(const std::string &_msg)
Definition: RocLogger.cpp:24
std::string SSTR(Args &&...components)
Creates a temporary string stream for string concatenation.
Definition: stringtricks.h:21
ImaqException(const std::string &_message)
Definition: ImaqGrabber.cpp:23
Definition: utils.h:28
void cbWrapper(void *data, void *hint)
Definition: utils.cpp:33
virtual const char * what() const
Definition: ImaqGrabber.cpp:26