8 ctx(_ctx), source(_source), keepGoing(false) {
12 wantedFrameTime = (uint64_t)(1e6/_wantedFPS);
22 extractorThread = thread(&Extractor::extract,
this);
26 cout <<
"stop serving" << endl;
28 if(extractorThread.joinable()) {
29 extractorThread.join();
31 cout <<
"extractor thread joined" << endl;
34 void Extractor::extractSetup() {
38 void Extractor::extract() {
41 zmq::socket_t sourceSocket(
ctx, ZMQ_SUB);
43 sourceSocket.setsockopt(ZMQ_LINGER, &linger,
sizeof(linger));
45 sourceSocket.connect(
"inproc://control");
47 sourceSocket.setsockopt(ZMQ_SUBSCRIBE,
"control", 7);
50 size_t more_size =
sizeof(more);
51 cout <<
"starting extractor" << endl;
53 zmq::pollitem_t items [] = {
54 {(
void*)sourceSocket, 0, ZMQ_POLLIN, 0}
57 zmq::poll(&items[0], 1, 100);
58 if(!(items[0].revents & ZMQ_POLLIN)) {
61 zmq::message_t sourceTopic;
62 sourceSocket.recv(&sourceTopic);
65 sourceSocket.getsockopt(ZMQ_RCVMORE, &more, &more_size);
66 if(!(more & 0xffffffff)) {
67 cout <<
"invalid message from topic " <<
source.
topic <<
"!!!" << endl;
70 zmq::message_t sourceDimensions;
71 sourceSocket.recv(&sourceDimensions);
75 uint64_t grabHighResTime =
readNetworkUInt64(((
char*)sourceDimensions.data()) + 12);
78 sourceSocket.getsockopt(ZMQ_RCVMORE, &more, &more_size);
79 if(!(more & 0xffffffff)) {
80 cout <<
"invalid message from topic " <<
source.
topic <<
"!!!" << endl;
83 zmq::message_t * sourceImage =
new zmq::message_t;
84 sourceSocket.recv(sourceImage);
86 if (grabHighResTime >= nextFrameTime) {
87 nextFrameTime += wantedFrameTime;
88 if (grabHighResTime > nextFrameTime) {
89 nextFrameTime = grabHighResTime + wantedFrameTime;
91 extractImpl(width, height, bytesPerPixel, grabHighResTime, grabWallTime, sourceImage);
96 }
else if(strncmp((
char*)sourceTopic.data(),
"control", 7) == 0) {
97 sourceSocket.getsockopt(ZMQ_RCVMORE, &more, &more_size);
98 if(!(more & 0xffffffff)) {
99 cout <<
"invalid message from topic " <<
source.
topic <<
"!!!" << endl;
102 zmq::message_t controlMessage;
103 sourceSocket.recv(&controlMessage);
104 if(strncmp((
char*)controlMessage.data(),
"stop", 4) == 0) {
105 cout <<
"stoping extractor" << endl;
112 sourceSocket.getsockopt(ZMQ_RCVMORE, &more, &more_size);
113 while((more & 0xffffffff)) {
115 zmq::message_t dummy;
116 sourceSocket.recv(&dummy);
117 sourceSocket.getsockopt(ZMQ_RCVMORE, &more, &more_size);
124 void Extractor::extractTeardown() {
uint64_t readNetworkUInt64(void *buffer)
uint32_t readNetworkUInt32(void *buffer)