runMACS
 All Data Structures Files Functions Variables Enumerations Enumerator Macros
camserver.cpp
Go to the documentation of this file.
1 #include <iostream>
2 #include <atomic>
3 #include <thread>
4 #include <condition_variable>
5 #include <mutex>
6 #include <chrono>
7 #include <vector>
8 #include <zmq.hpp>
9 
10 #include <tclap/CmdLine.h>
11 #include <uuid.h>
12 
13 #include <ImageGrabber.h>
14 #include <ZmqPubExtractor.h>
15 #include <StatsExtractor.h>
16 #include <FFMpegExtractor.h>
17 #include <RocLogger.h>
18 #include <stringtricks.h>
19 #include <CamManager.h>
20 #include <InfoServer.h>
21 #include <NetworkHelper.h>
22 #include <ConfigManager.h>
23 #include <StorageManager.h>
24 
25 #include <json/json.h>
26 
27 #include <config.h>
28 
29 #ifdef HAVE_WINDOWS
30 #include <windows.h>
31 #endif
32 #ifdef HAVE_LINUX
33 #include <signal.h>
34 #include <unistd.h>
35 #include <cstdlib>
36 #endif
37 #include <cstdio>
38 
39 using namespace std;
40 
41 static volatile bool keepRunning;
42 
43 #ifdef HAVE_WINDOWS
44 BOOL CtrlHandler(DWORD fdwCtrlType) {
45  switch(fdwCtrlType) {
46  case CTRL_C_EVENT:
47  if(keepRunning) {
48  cout << "stopping server, please wait ..." << endl;
49  keepRunning = false;
50  return TRUE;
51  } else {
52  cout << "received ctrl+c twice: terminating!" << endl;
53  return FALSE;
54  }
55  case CTRL_BREAK_EVENT:
56  if(keepRunning) {
57  cout << "stopping server, please wait ..." << endl;
58  keepRunning = false;
59  return TRUE;
60  } else {
61  cout << "received ctrl+c twice: terminating!" << endl;
62  return FALSE;
63  }
64  default:
65  return FALSE;
66  }
67 }
68 #endif
69 
70 #ifdef HAVE_LINUX
71 void SignalHandler(int s) {
72  switch(s) {
73  case SIGINT:
74  if(keepRunning) {
75  cout << "stopping server, please wait ..." << endl;
76  keepRunning = false;
77  return;
78  } else {
79  cout << "received ctrl+c twice: terminating!" << endl;
80  abort();
81  return;
82  }
83  default:
84  return;
85  }
86 }
87 #endif
88 
89 void pubsubproxy(zmq::context_t * ctx,
90  const string & from,
91  const string & to,
92  bool connectTo = false,
93  condition_variable * isUp = nullptr,
94  int linger = 100) {
95  /* a positive linger value ensures that the proxy will not hang forever on network errors */
96  zmq::socket_t in(*ctx, ZMQ_XSUB);
97  zmq::socket_t out(*ctx, ZMQ_XPUB);
98  in.bind(from.c_str());
99  out.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
100  if(connectTo) {
101  out.connect(to.c_str());
102  } else {
103  out.bind(to.c_str());
104  }
105  if(isUp != nullptr) {
106  isUp->notify_all();
107  }
108  try {
109  zmq::proxy((void*)in, (void*)out, NULL);
110  } catch(zmq::error_t &e) {
111  cout << e.what() << endl;
112  }
113  cout << "closing proxy" << endl;
114 }
115 
116 void sendControlMessage(zmq::socket_t & socket, const string & msg) {
117  zmq::message_t topic(7);
118  memcpy(topic.data(), "control", 7);
119  socket.send(topic, ZMQ_SNDMORE);
120  zmq::message_t data(msg.length());
121  memcpy(data.data(), msg.c_str(), msg.length());
122  socket.send(data);
123 }
124 
125 int main(int argc, char** argv) {
126  try {
127  TCLAP::CmdLine cmd("specMACS camserver", ' ', "0.1");
128  TCLAP::MultiArg<string> extraConfigDirs("C","configdirs","Extra folder to look for config files",false,"dirname",cmd);
129 
130  cmd.parse(argc, argv);
131 
132  for(auto & dirname: extraConfigDirs.getValue()) {
133  cout << "found extra config dir: " << dirname << endl;
134  CONFIGMANAGER.addConfigDir(dirname);
135  }
136 
137  } catch (TCLAP::ArgException &e) {
138  std::cerr << "error: " << e.error() << " for arg " << e.argId() << std::endl;
139  return -1;
140  }
141 
142  string address;
143  {
144  NetworkHelper netHelper;
145  auto addresses = netHelper.getIpAddresses();
146  if(addresses.size() < 1) {
147  cerr << "WARNING: found no public IP address! The program will most certainly not work correctly!" << endl;
148  } else {
149  if(CONFIGMANAGER["publish_address"] != Json::Value::null) {
150  string wantedAddress = CONFIGMANAGER["publish_address"].asString();
151  for (auto addr : addresses) {
152  if (addr == wantedAddress) {
153  address = addr;
154  }
155  }
156  if (address.empty()) {
157  cerr << wantedAddress << " not available!" << endl;
158  return -1;
159  }
160  } else {
161  address = *(addresses.begin());
162  if(addresses.size() > 1) {
163  cerr << "WARNING: found more than one IP address! Maybe you will experience connection problems!" << endl;
164  }
165  }
166  cout << "INFO: using IP address " << address << endl;
167  }
168  }
169  string rocBroker(SSTR("tcp://", CONFIGMANAGER["broker"].asString(), ":5555"));
170  string rocLogBroker(SSTR("tcp://", CONFIGMANAGER["broker"].asString(), ":5556"));
171  StorageManager storageManager;
172  for (auto & location: CONFIGMANAGER["storageLocations"]) {
173  storageManager.addStorageLocation(location.asString());
174  }
175  keepRunning = true;
176 #ifdef HAVE_WINDOWS
177  SetConsoleCtrlHandler( (PHANDLER_ROUTINE) CtrlHandler, TRUE );
178 #endif
179 #ifdef HAVE_LINUX
180  struct sigaction sigIntHandler;
181  sigIntHandler.sa_handler = SignalHandler;
182  sigemptyset(&sigIntHandler.sa_mask);
183  sigIntHandler.sa_flags = 0;
184  sigaction(SIGINT, &sigIntHandler, NULL);
185 #endif
186  cout << "console handler has been set!" << endl;
187  thread outputThread;
188  thread logoutputThread;
189 
190  {
191  zmq::context_t ctx(1);
192  cout << "zmq context initialized" << endl;
193  {
194  mutex logUpMutex;
195  condition_variable logUpCondition;
196  logoutputThread = thread(bind(pubsubproxy,
197  &ctx,
198  "inproc://log",
199  rocLogBroker,
200  true,
201  &logUpCondition,
202  100));
203  unique_lock<mutex> logUpLock(logUpMutex);
204  cout << "waiting for log output to come up!" << endl;
205  logUpCondition.wait(logUpLock);
206  }
207  cout << "initializing main logger" << endl;
208  RocLogger logger(ctx, "inproc://log", "camserver", "main");
209  cout << "sleep a little bit..." << endl;
210  this_thread::sleep_for(chrono::milliseconds(200)); /* wait a little bit for logging to set up */
211  cout << "yawn ... already that late? ... but now I'm awake." << endl;
212  logger.debug("camserver starting...");
213  cout << "camserver starting..." << endl;
214 
215  outputThread = thread(bind(pubsubproxy, &ctx, "inproc://output", "tcp://*:6666", false, nullptr, 100));
216 
217  zmq::socket_t control(ctx, ZMQ_PUB);
218  int linger = 0;
219  control.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
220  control.bind("inproc://control");
221 
222  const Json::Value cams = CONFIGMANAGER["cameras"];
223  cout << "found cams: " << cams << endl;
224  vector<ImageGrabber*> grabbers;
225  vector<CamManager*> camManagers;
226  vector<Extractor*> extractors;
227 
228  string sAutocaptureUUID = uuid4();
229 
230  for(unsigned int i=0; i<cams.size(); i++) {
231  string name = cams[i]["name"].asString();
232  string type = cams[i]["type"].asString();
233  string source = cams[i]["source"].asString();
234 
235  string local_endpoint = SSTR("inproc://", name, "_raw");
236 
237  grabbers.push_back(new ImageGrabber(name, type, source, ctx, rocBroker));
238 
239  grabbers.back()->addEndpoint(local_endpoint);
240 
241  auto publish_to = cams[i]["publish_to"];
242  if (!publish_to.empty()) {
243  cout << name << " will also be published to " << publish_to << endl;
244  grabbers.back()->addEndpoint(publish_to.asString());
245  }
246 
247  camManagers.push_back(new CamManager(ctx,
248  rocBroker,
249  SSTR(name, "CamManager"),
250  {local_endpoint, name},
251  storageManager));
252 
253  extractors.push_back(new StatsExtractor(ctx,
254  {local_endpoint, name},
255  {"inproc://log", name},
256  name));
257 
258  logger.debug(SSTR(name, " size: ", grabbers.back()->getWidth(), "x",
259  grabbers.back()->getHeight(), ", ", grabbers.back()->getBytesPerPixel()));
260 
261  grabbers.back()->startServing();
262  extractors.back()->startServing();
263 
264 #ifdef HAVE_LINUX
265  auto encode_to = cams[i]["encode_to"];
266  if (!encode_to.empty()) {
267  cout << name << " will also be encoded to " << encode_to << endl;
268  camManagers.back()->addExtractor("FFMpeg", {"FFMpeg", encode_to});
269  }
270 #endif
271 
272  /* auto start capture */
273  const Json::Value autocapture = cams[i]["autocapture"];
274  if(autocapture != Json::Value::null) {
275  const Json::Value enabled = autocapture["enabled"];
276  if(enabled != Json::Value::null && enabled.asBool() == true) {
277  cout << "autocapture is on for " << name << endl;
278  cout << autocapture << endl;
279  Json::Value metaData(Json::objectValue);
280  if(autocapture["metaData"] != Json::Value::null) {
281  metaData = autocapture["metaData"];
282  }
283  metaData["captureId"] = sAutocaptureUUID;
284  int fileCycling = -1;
285  if(autocapture["fileCycling"] != Json::Value::null) {
286  fileCycling = autocapture["fileCycling"].asInt();
287  }
288  bool binning = false;
289  if(autocapture["binning"] != Json::Value::null) {
290  binning = autocapture["binning"].asBool();
291  }
292  double wantedFPS = 0;
293  if(autocapture["wantedFPS"] != Json::Value::null) {
294  wantedFPS = autocapture["wantedFPS"].asDouble();
295  }
296  cout << "metaData: " << metaData << endl;
297  cout << "fileCycling: " << fileCycling << endl;
298  cout << "binning: " << binning << endl;
299  cout << "wantedFPS: " << wantedFPS << endl;
300  camManagers.back()->addExtractor("fileStorage", {"guiFileStorage",
301  -1,
302  Json::FastWriter().write(metaData),
303  fileCycling,
304  false,
305  binning,
306  wantedFPS});
307  }
308  }
309  }
310 
311  list<InfoServer> infoServers;
312  for(auto camera: cams) {
313  infoServers.emplace_back(ctx, rocBroker, SSTR("camInfo_", camera["name"].asString()), address);
314  }
315  /*
316  ZmqPubExtractor exVnir400(ctx,
317  {"inproc://vnir_raw", VNIR_NAME},
318  {"inproc://output", "vnir400"},
319  {0, 400, vnir.getWidth(), 1});
320  ZmqPubExtractor exVnir200(ctx,
321  {"inproc://vnir_raw", VNIR_NAME},
322  {"inproc://output", "vnir200"},
323  {0, 200, vnir.getWidth(), 1});
324 
325  ZmqPubExtractor exSwir100(ctx,
326  {"inproc://swir_raw", SWIR_NAME},
327  {"inproc://output", "swir100"},
328  {0, 100, swir.getWidth(), 1});
329  */
330 
331 
332  /*
333  exVnir400.startServing();
334  exVnir200.startServing();
335  exSwir100.startServing();
336  */
337 
338  while(keepRunning) {
339  this_thread::sleep_for(chrono::milliseconds(200));
340  sendControlMessage(control, "ping");
341  }
342 
343  sendControlMessage(control, "stop");
344 
345  /*
346  exVnir400.stopServing();
347  exVnir200.stopServing();
348  exSwir100.stopServing();
349  */
350 
351  for(auto & extractor: extractors) {
352  extractor->stopServing();
353  delete extractor;
354  }
355  for(auto & grabber: grabbers) {
356  grabber->stopServing();
357  delete grabber;
358  }
359  for(auto & camManager: camManagers) {
360  delete camManager;
361  }
362 
363  logger.debug("camserver finished.");
364  }
365  cout << "join output threads" << endl;
366  outputThread.join();
367  cout << "join logoutput threads" << endl;
368  logoutputThread.join();
369  cout << "join done" << endl;
370  return 0;
371 }
void pubsubproxy(zmq::context_t *ctx, const string &from, const string &to, bool connectTo=false, condition_variable *isUp=nullptr, int linger=100)
Definition: camserver.cpp:89
#define CONFIGMANAGER
Definition: ConfigManager.h:9
STL namespace.
std::list< std::string > getIpAddresses(bool allowIPv6=false)
void debug(const std::string &_msg)
Definition: RocLogger.cpp:24
void sendControlMessage(zmq::socket_t &socket, const string &msg)
Definition: camserver.cpp:116
Handles storage locations.
std::string SSTR(Args &&...components)
Creates a temporary string stream for string concatenation.
Definition: stringtricks.h:21
int main(int argc, char **argv)
Definition: camserver.cpp:125
Public interface to an ImageGrabber grabber.
Definition: ImageGrabber.h:45
void addStorageLocation(const std::string &path)
Add a new storage location.