8 using std::chrono::high_resolution_clock;
9 using std::chrono::milliseconds;
11 #define MSG_REQUEST (2)
13 #define MSG_HEARTBEAT (4)
14 #define MSG_DISCONNECT (5)
16 #define MAX_LIVENESS (3)
18 #define HB_INTERVALL (1000)
21 const std::string & _brokerEndpoint,
22 const std::string & _serviceName):
24 brokerEndpoint(_brokerEndpoint),
25 serviceName(_serviceName),
29 serverThread = thread(&RocServer::serve,
this);
33 cout <<
"stopping rocServer" << endl;
35 if(serverThread.joinable()) {
38 cout <<
"stopped rocServer" << endl;
49 Json::Value
RocServer::call(
const std::string & name,
const Json::Value & arguments) {
66 Json::Value RocServer::rocCall(
const std::string & name,
const Json::Value & arguments) {
67 if(name ==
"getattr") {
70 Json::Value attr =
getattr(arguments[0].asString());
71 ret[
"type"] =
"value";
74 ret[
"type"] =
"callable";
77 }
else if (name ==
"setattr") {
78 setattr(arguments[0].asString(), arguments[1]);
89 auto & req = *(data->begin());
92 bool parseOk = reader.parse((
const char *)req->data(), ((
const char *)req->data()) + req->size(), root,
false);
93 for(
auto & arg: *data) {
102 if(root[
"jsonrpc"].asString() !=
"2.0") {
106 std::string
id = root[
"id"].asString();
107 std::string method = root[
"method"].asString();
111 if(method.compare(0, 5,
"_roc.") == 0) {
112 ret = rocCall(method.substr(5), root[
"params"]);
114 ret =
call(method, root[
"params"]);
122 }
catch (exception &exc) {
123 cout <<
"exception:" << exc.what() << endl;
129 void RocServer::serve() {
131 sock =
new zmq::socket_t(
ctx, ZMQ_DEALER);
132 sock->setsockopt(ZMQ_LINGER, &linger,
sizeof(linger));
133 sock->connect(brokerEndpoint.c_str());
135 zmq::socket_t controlSock(
ctx, ZMQ_SUB);
136 controlSock.connect(
"inproc://control");
137 controlSock.setsockopt(ZMQ_SUBSCRIBE,
"control", 7);
140 send(1, serviceName);
144 auto next_heartbeat = chrono::high_resolution_clock::now() + chrono::milliseconds(
HB_INTERVALL);
146 zmq::pollitem_t items [] = {
147 { (
void*)*sock, 0, ZMQ_POLLIN, 0},
148 { (
void*)controlSock, 0, ZMQ_POLLIN, 0}
152 size_t more_size =
sizeof(more);
155 zmq::poll(items, 2, timeout);
156 if(items[0].revents & ZMQ_POLLIN) {
158 zmq::message_t empty;
160 zmq::message_t header;
162 if(strncmp((
char*)header.data(),
"MDPW01", 6) == 0) {
164 zmq::message_t command;
165 sock->recv(&command);
166 switch(((
char*)command.data())[0]) {
174 cout <<
"disconnect received!" << endl;
177 cout <<
"message " << (int)((
char*)command.data())[0] << " received" << endl;
180 sock->getsockopt(ZMQ_RCVMORE, &more, &more_size);
182 cout <<
"recvmore" << endl;
183 zmq::message_t dummy;
185 sock->getsockopt(ZMQ_RCVMORE, &more, &more_size);
188 if(items[1].revents & ZMQ_POLLIN) {
189 zmq::message_t header;
190 controlSock.recv(&header);
191 zmq::message_t command;
192 controlSock.recv(&command);
193 if(strncmp((
char*)command.data(),
"stop", 4) == 0) {
194 cout <<
"stoping rocServer" << endl;
198 if(next_heartbeat < chrono::high_resolution_clock::now()) {
201 next_heartbeat = chrono::high_resolution_clock::now() + chrono::milliseconds(
HB_INTERVALL);
205 cout <<
"rocServer loop end" << endl;
209 void RocServer::send(
char _command,
const std::string & _option) {
210 zmq::message_t empty(0);
211 sock->send(empty, ZMQ_SNDMORE);
212 zmq::message_t header(6);
213 memcpy(header.data(),
"MDPW01", 6);
214 sock->send(header, ZMQ_SNDMORE);
215 zmq::message_t command(1);
216 ((
char*)command.data())[0] = _command;
217 if(_option.length() == 0) {
220 sock->send(command, ZMQ_SNDMORE);
221 zmq::message_t data(_option.length());
222 memcpy(data.data(), _option.c_str(), _option.length());
227 void RocServer::_onRequest() {
228 auto requestEnvelope =
new list<zmq::message_t *>;
229 auto requestData =
new list<zmq::message_t *>;
232 size_t more_size =
sizeof(more);
233 sock->getsockopt(ZMQ_RCVMORE, &more, &more_size);
234 bool inEnvelope =
true;
236 auto msg =
new zmq::message_t;
239 if(msg->size() > 0) {
240 requestEnvelope->push_back(msg);
246 requestData->push_back(msg);
248 sock->getsockopt(ZMQ_RCVMORE, &more, &more_size);
253 void RocServer::reply(std::list<zmq::message_t*>* envelope, std::list<zmq::message_t*>* data) {
254 zmq::message_t empty(0);
255 sock->send(empty, ZMQ_SNDMORE);
256 zmq::message_t header(6);
257 memcpy(header.data(),
"MDPW01", 6);
258 sock->send(header, ZMQ_SNDMORE);
259 zmq::message_t command(1);
261 sock->send(command, ZMQ_SNDMORE);
262 while(!envelope->empty()) {
263 zmq::message_t * msg = envelope->front();
264 envelope->pop_front();
265 sock->send(*msg, ZMQ_SNDMORE);
269 zmq::message_t delimiter(0);
270 if(data ==
nullptr || data->empty()) {
271 sock->send(delimiter);
273 sock->send(delimiter, ZMQ_SNDMORE);
274 while(!data->empty()) {
275 zmq::message_t * msg = data->front();
280 sock->send(*msg, ZMQ_SNDMORE);
288 void RocServer::replyValue(std::list<zmq::message_t*>* envelope,
const Json::Value & value,
const std::string &
id) {
290 root[
"jsonrpc"] =
"2.0";
291 root[
"result"] = value;
294 std::string encodedResponse = root.toStyledString();
295 auto data =
new list<zmq::message_t *>;
296 auto msg =
new zmq::message_t(encodedResponse.length());
297 memcpy(msg->data(), encodedResponse.c_str(), encodedResponse.length());
298 data->push_back(msg);
299 reply(envelope, data);
304 root[
"jsonrpc"] =
"2.0";
306 error[
"code"] = errorNo;
309 error[
"message"] =
"Parse error";
312 error[
"message"] =
"Invalid Request";
315 error[
"message"] =
"Method not found";
318 error[
"message"] =
"Invalid params";
321 error[
"message"] =
"Internal error";
325 root[
"error"] = error;
326 if(
id.length() > 0) {
329 root[
"id"] = Json::Value::null;
331 std::string encodedResponse = root.toStyledString();
332 auto data =
new list<zmq::message_t *>;
333 auto msg =
new zmq::message_t(encodedResponse.length());
334 memcpy(msg->data(), encodedResponse.c_str(), encodedResponse.length());
335 data->push_back(msg);
336 reply(envelope, data);
virtual void onRequest(std::list< zmq::message_t * > *envelope, std::list< zmq::message_t * > *data)
The (not anymore) "to-be-overloaded" request handler.
#define ROC_JSON_PARSE_ERROR
#define ROC_JSON_INVALID_REQUEST
#define ROC_JSON_METHOD_NOT_FOUND
RocServer(zmq::context_t &_ctx, const std::string &_brokerEndpoint, const std::string &_serviceName)
virtual Json::Value call(const std::string &name, const Json::Value &arguments)
The "to-be-overloaded" call handler.
void replyValue(std::list< zmq::message_t * > *envelope, const Json::Value &value, const std::string &id="")
void reply(std::list< zmq::message_t * > *envelope, std::list< zmq::message_t * > *data=nullptr)
virtual void setattr(const std::string &name, const Json::Value &value)
The "to-be-overloaded" setattr handler.
virtual void teardown()
called in handler thread on shutdown
#define ROC_JSON_INVALID_PARAMS
void replyError(std::list< zmq::message_t * > *envelope, int errorNo, const std::string &id="")
virtual Json::Value getattr(const std::string &name)
The "to-be-overloaded" getattr handler.
virtual void startup()
called in handler thread on startup
#define ROC_JSON_INTERNAL_ERROR