runMACS
 All Data Structures Files Functions Variables Enumerations Enumerator Macros
Extractor.cpp
Go to the documentation of this file.
1 #include <Extractor.h>
2 
3 #include <iostream>
4 
5 using namespace std;
6 
7 Extractor::Extractor(zmq::context_t & _ctx, const PubSubEndpoint & _source, double _wantedFPS):
8  ctx(_ctx), source(_source), keepGoing(false) {
9  if (_wantedFPS <= 0) {
10  wantedFrameTime = 0;
11  } else {
12  wantedFrameTime = (uint64_t)(1e6/_wantedFPS);
13  }
14 }
15 
17 
18 }
19 
21  keepGoing = true;
22  extractorThread = thread(&Extractor::extract, this);
23 }
24 
26  cout << "stop serving" << endl;
27  keepGoing = false;
28  if(extractorThread.joinable()) {
29  extractorThread.join();
30  }
31  cout << "extractor thread joined" << endl;
32 }
33 
34 void Extractor::extractSetup() {
35 
36 }
37 
38 void Extractor::extract() {
39  extractSetup();
40  {
41  zmq::socket_t sourceSocket(ctx, ZMQ_SUB);
42  int linger = 0;
43  sourceSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
44  sourceSocket.connect(source.endpoint.c_str());
45  sourceSocket.connect("inproc://control");
46  sourceSocket.setsockopt(ZMQ_SUBSCRIBE, source.topic.c_str(), source.topic.length());
47  sourceSocket.setsockopt(ZMQ_SUBSCRIBE, "control", 7);
48 
49  int64_t more;
50  size_t more_size = sizeof(more);
51  cout << "starting extractor" << endl;
52 
53  zmq::pollitem_t items [] = {
54  {(void*)sourceSocket, 0, ZMQ_POLLIN, 0}
55  };
56  while(keepGoing) {
57  zmq::poll(&items[0], 1, 100);
58  if(!(items[0].revents & ZMQ_POLLIN)) {
59  continue;
60  }
61  zmq::message_t sourceTopic;
62  sourceSocket.recv(&sourceTopic);
63  if(strncmp((char*)sourceTopic.data(), source.topic.c_str(), source.topic.length()) == 0) {
64  // got right data
65  sourceSocket.getsockopt(ZMQ_RCVMORE, &more, &more_size);
66  if(!(more & 0xffffffff)) {
67  cout << "invalid message from topic " << source.topic << "!!!" << endl;
68  continue;
69  }
70  zmq::message_t sourceDimensions;
71  sourceSocket.recv(&sourceDimensions);
72  uint32_t width = readNetworkUInt32(((char*)sourceDimensions.data()) + 0);
73  uint32_t height = readNetworkUInt32(((char*)sourceDimensions.data()) + 4);
74  uint32_t bytesPerPixel = readNetworkUInt32(((char*)sourceDimensions.data()) + 8);
75  uint64_t grabHighResTime = readNetworkUInt64(((char*)sourceDimensions.data()) + 12);
76  uint64_t grabWallTime = readNetworkUInt64(((char*)sourceDimensions.data()) + 20);
77 
78  sourceSocket.getsockopt(ZMQ_RCVMORE, &more, &more_size);
79  if(!(more & 0xffffffff)) {
80  cout << "invalid message from topic " << source.topic << "!!!" << endl;
81  continue;
82  }
83  zmq::message_t * sourceImage = new zmq::message_t;
84  sourceSocket.recv(sourceImage);
85 
86  if (grabHighResTime >= nextFrameTime) {
87  nextFrameTime += wantedFrameTime;
88  if (grabHighResTime > nextFrameTime) {
89  nextFrameTime = grabHighResTime + wantedFrameTime;
90  }
91  extractImpl(width, height, bytesPerPixel, grabHighResTime, grabWallTime, sourceImage);
92  } else {
93  delete sourceImage;
94  }
95 
96  } else if(strncmp((char*)sourceTopic.data(), "control", 7) == 0) {
97  sourceSocket.getsockopt(ZMQ_RCVMORE, &more, &more_size);
98  if(!(more & 0xffffffff)) {
99  cout << "invalid message from topic " << source.topic << "!!!" << endl;
100  continue;
101  }
102  zmq::message_t controlMessage;
103  sourceSocket.recv(&controlMessage);
104  if(strncmp((char*)controlMessage.data(), "stop", 4) == 0) {
105  cout << "stoping extractor" << endl;
106  break;
107  }
108  } else {
109  // got some unexpected data
110  }
111  // flush remaining multipart data if any
112  sourceSocket.getsockopt(ZMQ_RCVMORE, &more, &more_size);
113  while((more & 0xffffffff)) { /* this is a hack, ZMQ should either return 0 or 1 but more_size gets strangely
114  reset to 4 which results in garbage in the upper 4 bytes */
115  zmq::message_t dummy;
116  sourceSocket.recv(&dummy);
117  sourceSocket.getsockopt(ZMQ_RCVMORE, &more, &more_size);
118  }
119  }
120  }
121  extractTeardown();
122 }
123 
124 void Extractor::extractTeardown() {
125 
126 }
uint64_t readNetworkUInt64(void *buffer)
Definition: utils.cpp:56
STL namespace.
PubSubEndpoint source
Definition: Extractor.h:28
zmq::context_t & ctx
Definition: Extractor.h:27
virtual ~Extractor()=0
Definition: Extractor.cpp:16
Extractor(zmq::context_t &_ctx, const PubSubEndpoint &_source, double _wantedFPS=0)
Definition: Extractor.cpp:7
std::string endpoint
Definition: utils.h:15
void stopServing()
Definition: Extractor.cpp:25
uint32_t readNetworkUInt32(void *buffer)
Definition: utils.cpp:50
std::string topic
Definition: utils.h:16
void startServing()
Definition: Extractor.cpp:20
volatile bool keepGoing
Definition: Extractor.h:29