runMACS
 All Data Structures Files Functions Variables Enumerations Enumerator Macros
RocServer.cpp
Go to the documentation of this file.
1 #include <RocServer.h>
2 
3 #include <iostream>
4 #include <chrono>
5 
6 using namespace std;
7 
8 using std::chrono::high_resolution_clock;
9 using std::chrono::milliseconds;
10 
11 #define MSG_REQUEST (2)
12 #define MSG_REPLY (3)
13 #define MSG_HEARTBEAT (4)
14 #define MSG_DISCONNECT (5)
15 
16 #define MAX_LIVENESS (3)
17 
18 #define HB_INTERVALL (1000)
19 
20 RocServer::RocServer(zmq::context_t & _ctx,
21  const std::string & _brokerEndpoint,
22  const std::string & _serviceName):
23  ctx(_ctx),
24  brokerEndpoint(_brokerEndpoint),
25  serviceName(_serviceName),
26  timeout(HB_INTERVALL),
27  keepGoing(true),
28  liveness(MAX_LIVENESS) {
29  serverThread = thread(&RocServer::serve, this);
30 }
31 
33  cout << "stopping rocServer" << endl;
34  keepGoing = false;
35  if(serverThread.joinable()) {
36  serverThread.join();
37  }
38  cout << "stopped rocServer" << endl;
39 }
40 
42 
43 }
44 
46 
47 }
48 
49 Json::Value RocServer::call(const std::string & name, const Json::Value & arguments) {
50  (void) name;
51  (void) arguments;
52  throw RocNotImplementedError();
53 }
54 
55 Json::Value RocServer::getattr(const std::string & name) {
56  (void) name;
57  throw RocIsCallable();
58 }
59 
60 void RocServer::setattr(const std::string & name, const Json::Value & value) {
61  (void) name;
62  (void) value;
63  throw RocNotImplementedError();
64 }
65 
66 Json::Value RocServer::rocCall(const std::string & name, const Json::Value & arguments) {
67  if(name == "getattr") {
68  Json::Value ret;
69  try {
70  Json::Value attr = getattr(arguments[0].asString());
71  ret["type"] = "value";
72  ret["value"] = attr;
73  } catch(RocIsCallable) {
74  ret["type"] = "callable";
75  }
76  return ret;
77  } else if (name == "setattr") {
78  setattr(arguments[0].asString(), arguments[1]);
79  /* if setattr throws, the next part is not reached. */
80  Json::Value ret;
81  ret["type"] = "ok";
82  return ret;
83  }
84  throw RocNotImplementedError();
85 }
86 
87 void RocServer::onRequest(std::list<zmq::message_t*>* envelope, std::list<zmq::message_t*>* data) {
88  /* roc request should be in the first data element */
89  auto & req = *(data->begin());
90  Json::Value root;
91  Json::Reader reader;
92  bool parseOk = reader.parse((const char *)req->data(), ((const char *)req->data()) + req->size(), root, false);
93  for(auto & arg: *data) {
94  /* cout << (char*)(arg->data()) << ", "; */
95  delete arg;
96  }
97  delete data;
98  if(!parseOk) {
100  return;
101  }
102  if(root["jsonrpc"].asString() != "2.0") {
104  return;
105  }
106  std::string id = root["id"].asString();
107  std::string method = root["method"].asString();
108 
109  try {
110  Json::Value ret;
111  if(method.compare(0, 5, "_roc.") == 0) {
112  ret = rocCall(method.substr(5), root["params"]);
113  } else {
114  ret = call(method, root["params"]);
115  }
116  /* cout << "return:" << ret << endl; */
117  replyValue(envelope, ret, id);
118  return;
119  } catch (RocNotImplementedError) {
120  replyError(envelope, ROC_JSON_METHOD_NOT_FOUND, id);
121  return;
122  } catch (exception &exc) {
123  cout << "exception:" << exc.what() << endl;
124  replyError(envelope, ROC_JSON_INTERNAL_ERROR, id);
125  return;
126  }
127 }
128 
129 void RocServer::serve() {
130  int linger = 100;
131  sock = new zmq::socket_t(ctx, ZMQ_DEALER);
132  sock->setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
133  sock->connect(brokerEndpoint.c_str());
134 
135  zmq::socket_t controlSock(ctx, ZMQ_SUB);
136  controlSock.connect("inproc://control");
137  controlSock.setsockopt(ZMQ_SUBSCRIBE, "control", 7);
138 
139  /* send ready */
140  send(1, serviceName);
141 
142  startup();
143 
144  auto next_heartbeat = chrono::high_resolution_clock::now() + chrono::milliseconds(HB_INTERVALL);
145 
146  zmq::pollitem_t items [] = {
147  { (void*)*sock, 0, ZMQ_POLLIN, 0},
148  { (void*)controlSock, 0, ZMQ_POLLIN, 0}
149  };
150 
151  int64_t more = 0;
152  size_t more_size = sizeof(more);
153 
154  while(keepGoing) {
155  zmq::poll(items, 2, timeout);
156  if(items[0].revents & ZMQ_POLLIN) {
157  /* receive data here */
158  zmq::message_t empty;
159  sock->recv(&empty);
160  zmq::message_t header;
161  sock->recv(&header);
162  if(strncmp((char*)header.data(), "MDPW01", 6) == 0) {
163  liveness = MAX_LIVENESS;
164  zmq::message_t command;
165  sock->recv(&command);
166  switch(((char*)command.data())[0]) {
167  case MSG_REQUEST: /* request */
168  /* cout << "request received!" << endl; */
169  _onRequest();
170  break;
171  case MSG_HEARTBEAT: /* heartbeat */
172  break;
173  case MSG_DISCONNECT: /* disconnect */
174  cout << "disconnect received!" << endl;
175  break;
176  default:
177  cout << "message " << (int)((char*)command.data())[0] << " received" << endl;
178  }
179  }
180  sock->getsockopt(ZMQ_RCVMORE, &more, &more_size);
181  while(more == 1) {
182  cout << "recvmore" << endl;
183  zmq::message_t dummy;
184  sock->recv(&dummy);
185  sock->getsockopt(ZMQ_RCVMORE, &more, &more_size);
186  }
187  }
188  if(items[1].revents & ZMQ_POLLIN) {
189  zmq::message_t header;
190  controlSock.recv(&header);
191  zmq::message_t command;
192  controlSock.recv(&command);
193  if(strncmp((char*)command.data(), "stop", 4) == 0) {
194  cout << "stoping rocServer" << endl;
195  keepGoing = false;
196  }
197  }
198  if(next_heartbeat < chrono::high_resolution_clock::now()) {
199  --liveness;
200  send(MSG_HEARTBEAT);
201  next_heartbeat = chrono::high_resolution_clock::now() + chrono::milliseconds(HB_INTERVALL);
202  }
203  }
204  teardown();
205  cout << "rocServer loop end" << endl;
206  delete sock;
207 }
208 
209 void RocServer::send(char _command, const std::string & _option) {
210  zmq::message_t empty(0);
211  sock->send(empty, ZMQ_SNDMORE);
212  zmq::message_t header(6);
213  memcpy(header.data(), "MDPW01", 6);
214  sock->send(header, ZMQ_SNDMORE);
215  zmq::message_t command(1);
216  ((char*)command.data())[0] = _command;
217  if(_option.length() == 0) {
218  sock->send(command);
219  } else {
220  sock->send(command, ZMQ_SNDMORE);
221  zmq::message_t data(_option.length());
222  memcpy(data.data(), _option.c_str(), _option.length());
223  sock->send(data);
224  }
225 }
226 
227 void RocServer::_onRequest() {
228  auto requestEnvelope = new list<zmq::message_t *>;
229  auto requestData = new list<zmq::message_t *>;
230 
231  int64_t more = 0;
232  size_t more_size = sizeof(more);
233  sock->getsockopt(ZMQ_RCVMORE, &more, &more_size);
234  bool inEnvelope = true;
235  while(more == 1) {
236  auto msg = new zmq::message_t;
237  sock->recv(msg);
238  if(inEnvelope) {
239  if(msg->size() > 0) {
240  requestEnvelope->push_back(msg);
241  } else {
242  delete msg;
243  inEnvelope = false;
244  }
245  } else {
246  requestData->push_back(msg);
247  }
248  sock->getsockopt(ZMQ_RCVMORE, &more, &more_size);
249  }
250  onRequest(requestEnvelope, requestData);
251 }
252 
253 void RocServer::reply(std::list<zmq::message_t*>* envelope, std::list<zmq::message_t*>* data) {
254  zmq::message_t empty(0);
255  sock->send(empty, ZMQ_SNDMORE);
256  zmq::message_t header(6);
257  memcpy(header.data(), "MDPW01", 6);
258  sock->send(header, ZMQ_SNDMORE);
259  zmq::message_t command(1);
260  ((char*)command.data())[0] = MSG_REPLY;
261  sock->send(command, ZMQ_SNDMORE);
262  while(!envelope->empty()) {
263  zmq::message_t * msg = envelope->front();
264  envelope->pop_front();
265  sock->send(*msg, ZMQ_SNDMORE);
266  delete msg;
267  }
268  delete envelope;
269  zmq::message_t delimiter(0);
270  if(data == nullptr || data->empty()) {
271  sock->send(delimiter);
272  } else {
273  sock->send(delimiter, ZMQ_SNDMORE);
274  while(!data->empty()) {
275  zmq::message_t * msg = data->front();
276  data->pop_front();
277  if(data->empty()) {
278  sock->send(*msg);
279  } else {
280  sock->send(*msg, ZMQ_SNDMORE);
281  }
282  delete msg;
283  }
284  }
285  delete data;
286 }
287 
288 void RocServer::replyValue(std::list<zmq::message_t*>* envelope, const Json::Value & value, const std::string & id) {
289  Json::Value root;
290  root["jsonrpc"] = "2.0";
291  root["result"] = value;
292  root["id"] = id;
293 
294  std::string encodedResponse = root.toStyledString();
295  auto data = new list<zmq::message_t *>;
296  auto msg = new zmq::message_t(encodedResponse.length());
297  memcpy(msg->data(), encodedResponse.c_str(), encodedResponse.length());
298  data->push_back(msg);
299  reply(envelope, data);
300 }
301 
302 void RocServer::replyError(std::list<zmq::message_t*>* envelope, int errorNo, const std::string & id) {
303  Json::Value root;
304  root["jsonrpc"] = "2.0";
305  Json::Value error;
306  error["code"] = errorNo;
307  switch (errorNo) {
309  error["message"] = "Parse error";
310  break;
312  error["message"] = "Invalid Request";
313  break;
315  error["message"] = "Method not found";
316  break;
318  error["message"] = "Invalid params";
319  break;
321  error["message"] = "Internal error";
322  break;
323  }
324  error["data"] = "";
325  root["error"] = error;
326  if(id.length() > 0) {
327  root["id"] = id;
328  } else {
329  root["id"] = Json::Value::null;
330  }
331  std::string encodedResponse = root.toStyledString();
332  auto data = new list<zmq::message_t *>;
333  auto msg = new zmq::message_t(encodedResponse.length());
334  memcpy(msg->data(), encodedResponse.c_str(), encodedResponse.length());
335  data->push_back(msg);
336  reply(envelope, data);
337 }
#define MSG_DISCONNECT
Definition: RocServer.cpp:14
virtual void onRequest(std::list< zmq::message_t * > *envelope, std::list< zmq::message_t * > *data)
The (not anymore) "to-be-overloaded" request handler.
Definition: RocServer.cpp:87
#define ROC_JSON_PARSE_ERROR
Definition: RocServer.h:12
virtual ~RocServer()
Definition: RocServer.cpp:32
#define ROC_JSON_INVALID_REQUEST
Definition: RocServer.h:13
STL namespace.
#define ROC_JSON_METHOD_NOT_FOUND
Definition: RocServer.h:14
#define MSG_REQUEST
Definition: RocServer.cpp:11
#define MAX_LIVENESS
Definition: RocServer.cpp:16
zmq::context_t & ctx
Definition: RocServer.h:77
RocServer(zmq::context_t &_ctx, const std::string &_brokerEndpoint, const std::string &_serviceName)
Definition: RocServer.cpp:20
#define HB_INTERVALL
Definition: RocServer.cpp:18
virtual Json::Value call(const std::string &name, const Json::Value &arguments)
The "to-be-overloaded" call handler.
Definition: RocServer.cpp:49
#define MSG_HEARTBEAT
Definition: RocServer.cpp:13
void replyValue(std::list< zmq::message_t * > *envelope, const Json::Value &value, const std::string &id="")
Definition: RocServer.cpp:288
void reply(std::list< zmq::message_t * > *envelope, std::list< zmq::message_t * > *data=nullptr)
Definition: RocServer.cpp:253
#define MSG_REPLY
Definition: RocServer.cpp:12
virtual void setattr(const std::string &name, const Json::Value &value)
The "to-be-overloaded" setattr handler.
Definition: RocServer.cpp:60
virtual void teardown()
called in handler thread on shutdown
Definition: RocServer.cpp:45
#define ROC_JSON_INVALID_PARAMS
Definition: RocServer.h:15
void replyError(std::list< zmq::message_t * > *envelope, int errorNo, const std::string &id="")
Definition: RocServer.cpp:302
virtual Json::Value getattr(const std::string &name)
The "to-be-overloaded" getattr handler.
Definition: RocServer.cpp:55
virtual void startup()
called in handler thread on startup
Definition: RocServer.cpp:41
#define ROC_JSON_INTERNAL_ERROR
Definition: RocServer.h:16