grpcclient.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. #include "grpcclient.h"
  2. grpcclient * ggrpcclient;
  3. void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname)
  4. {
  5. (void)index;
  6. (void)dt;
  7. ggrpcclient->UpdateData(strdata,nSize,strmemname);
  8. }
  9. grpcclient::grpcclient(std::string stryamlpath)
  10. {
  11. ggrpcclient = this;
  12. dec_yaml(stryamlpath.data());
  13. int i;
  14. for(i=0;i<(int)mvectormsgunit.size();i++)
  15. {
  16. mvectormsgunit[i].mpa = iv::modulecomm::RegisterRecv(mvectormsgunit[i].mstrmsgname,ListenData);
  17. }
  18. for(i=0;i<(int)mvectorctrlmsgunit.size();i++)
  19. {
  20. mvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterSend(mvectorctrlmsgunit[i].mstrmsgname,mvectorctrlmsgunit[i].mnBufferSize,
  21. mvectorctrlmsgunit[i].mnBufferCount);
  22. }
  23. }
  24. qint64 grpcclient::calclatency(qint64 nnewlatency)
  25. {
  26. mvectorlatency.push_back(nnewlatency);
  27. while(mvectorlatency.size()>5)mvectorlatency.erase(mvectorlatency.begin());
  28. qint64 nlatencytotal =0;
  29. int nsize = mvectorlatency.size();
  30. int i;
  31. for(i=0;i<nsize;i++)
  32. {
  33. nlatencytotal = nlatencytotal + mvectorlatency.at(i);
  34. }
  35. if(nsize > 0)
  36. {
  37. nlatencytotal = nlatencytotal/nsize;
  38. }
  39. mnlatency = nlatencytotal;
  40. return nlatencytotal;
  41. }
  42. void grpcclient::sendcloudmsg(iv::cloud::cloudmsg &xmsg,std::shared_ptr<::grpc::ClientReaderWriter<iv::CloudSwapRequestStream, iv::CloudSwapReplyStream> > writer)
  43. {
  44. static int64_t nmsgindex = 0;
  45. nmsgindex++;
  46. int nbytesize = xmsg.ByteSize();
  47. char * strbuf = new char[nbytesize];
  48. std::shared_ptr<char> pstrbuf;
  49. pstrbuf.reset(strbuf);
  50. if(xmsg.SerializeToArray(strbuf,nbytesize))
  51. {
  52. iv::CloudSwapRequestStream request;
  53. request.set_nmsgindex(nmsgindex);
  54. request.set_nmsgtime(std::chrono::system_clock::now().time_since_epoch().count());
  55. request.set_pingavg(0);
  56. request.set_pingdev(0);
  57. request.set_pingmax(0);
  58. request.set_pingmin(0);
  59. request.set_strnodeid(mstrnodeid);
  60. request.set_strobjnodeid(mstrobjnodeid);
  61. request.set_xdata(strbuf,nbytesize);
  62. request.set_nunitcount(xmsg.xclouddata_size());
  63. QTime xt;
  64. xt.start();
  65. ::grpc::WriteOptions wo;
  66. // wo.set_write_through();
  67. // wo.clear_buffer_hint();
  68. // writer->Write(request,(void * )2);
  69. // bool bsend = true;
  70. bool bsend = writer->Write(request,wo);
  71. if(bsend == false)std::cout<<"send msg. rtn is "<<bsend<<std::endl;
  72. }
  73. }
  74. void grpcclient::threadsend(std::shared_ptr<::grpc::ClientReaderWriter<iv::CloudSwapRequestStream, iv::CloudSwapReplyStream> > writer,std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime)
  75. {
  76. (void)nlastreftime;
  77. std::cout<<"threadsend start "<<std::endl;
  78. int ninterval = 1000;
  79. int nheartbeatinterval = 1000; //when 1000ms no data, send hartbeat interval, no message data.
  80. mninterval = ninterval;
  81. QTime xTime;
  82. xTime.start();
  83. int nlastsend = xTime.elapsed();
  84. while(*pbrun)
  85. {
  86. iv::cloud::cloudmsg xmsg;
  87. if(mcloudemsg.xclouddata_size() == 0)
  88. {
  89. if(abs(xTime.elapsed()-nlastsend)>= nheartbeatinterval)
  90. {
  91. sendcloudmsg(xmsg,writer); //send data.
  92. nlastsend = xTime.elapsed();
  93. }
  94. else
  95. {
  96. std::this_thread::sleep_for(std::chrono::milliseconds(1));
  97. }
  98. continue;
  99. }
  100. mmutexmsg.lock();
  101. xmsg.CopyFrom(mcloudemsg);
  102. mcloudemsg.clear_xclouddata();
  103. mmutexmsg.unlock();
  104. sendcloudmsg(xmsg,writer); //send data.
  105. nlastsend = xTime.elapsed();
  106. }
  107. std::cout<<"thread send end."<<std::endl;
  108. }
  109. void grpcclient::threadrecv(std::shared_ptr<::grpc::ClientReaderWriter<iv::CloudSwapRequestStream, iv::CloudSwapReplyStream> > writer, std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime)
  110. {
  111. (void)nlastreftime;
  112. std::cout<<"threadrecv start"<<std::endl;
  113. iv::CloudSwapReplyStream reply;
  114. while (writer->Read(&reply)) {
  115. std::cout<<" reply nres: "<<reply.xdata_size()<<std::endl;
  116. // nfail = 0;
  117. // std::cout << "接收到回复:" << reply.remsg()<<"--\n" << std::endl;
  118. if(reply.nres() > 0)
  119. {
  120. int i;
  121. for(i=0;i<reply.xdata_size();i++)
  122. {
  123. iv::cloud::cloudmsg xmsg;
  124. if(xmsg.ParseFromArray(reply.mutable_xdata(i)->data(),reply.mutable_xdata(i)->size()))
  125. {
  126. shareswapmsg(&xmsg);
  127. }
  128. }
  129. }
  130. // std::cout<<"read data from server."<<std::endl;
  131. }
  132. std::cout<<"threadrecv end."<<std::endl;
  133. *pbrun = false;
  134. }
  135. void grpcclient::threadRPC(std::shared_ptr<qint64> pnrpctime,std::shared_ptr<bool> pbRun)
  136. {
  137. std::cout<<"threadrpc start."<<std::endl;
  138. std::string target_str = gstrserverip+":";
  139. target_str = target_str + gstrserverport ;//std::to_string()
  140. auto cargs = grpc::ChannelArguments();
  141. cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
  142. cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
  143. std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
  144. target_str, grpc::InsecureChannelCredentials(),cargs);
  145. std::unique_ptr<iv::CloudSwapStream::Stub> stub_ = iv::CloudSwapStream::NewStub(channel);
  146. int nfail = 0;
  147. while(!QThread::isInterruptionRequested())
  148. {
  149. ClientContext context ;
  150. // std::shared_ptr<ClientContext> pcontext(new ClientContext);
  151. std::shared_ptr<::grpc::ClientReaderWriter<iv::CloudSwapRequestStream, iv::CloudSwapReplyStream> > writerRead(stub_->swap(&context));
  152. // std::shared_ptr<bool> pbRun(new bool);
  153. *pbRun = true;
  154. std::shared_ptr<qint64> pntime = pnrpctime ;
  155. *pntime = QDateTime::currentMSecsSinceEpoch();
  156. std::thread * pthread = new std::thread(&grpcclient::threadsend,this,writerRead,pbRun,pntime);
  157. (void )pthread;
  158. std::thread * precvthread = new std::thread(&grpcclient::threadrecv,this,writerRead,pbRun,pntime);
  159. (void)precvthread;
  160. pthread->join();
  161. precvthread->join();
  162. // std::cout<<"threadRPC end"<<std::endl;
  163. // *pbRun = false;
  164. // while(((QDateTime::currentMSecsSinceEpoch() - *pntime)<3000)&&(*pbRun))
  165. // {
  166. // std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  167. // }
  168. // *pbRun = false;
  169. std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  170. channel = grpc::CreateCustomChannel(
  171. target_str, grpc::InsecureChannelCredentials(),cargs);
  172. stub_ = iv::CloudSwapStream::NewStub(channel);
  173. nfail++;
  174. // if(nfail > 100)std::this_thread::sleep_for(std::chrono::milliseconds(3000));
  175. // else std::this_thread::sleep_for(std::chrono::milliseconds(100));
  176. std::cout<<"reconnnect to server. nfail is "<<nfail<<std::endl;
  177. }
  178. }
  179. void grpcclient::run()
  180. {
  181. std::shared_ptr<bool> pbRun(new bool);
  182. *pbRun = true;
  183. std::shared_ptr<qint64> pntime(new qint64);
  184. *pntime = QDateTime::currentMSecsSinceEpoch();
  185. std::thread * pthread = new std::thread(&grpcclient::threadRPC,this,pntime,pbRun);
  186. (void)pthread;
  187. return;
  188. }
  189. void grpcclient::dec_yaml(const char * stryamlpath)
  190. {
  191. YAML::Node config;
  192. try
  193. {
  194. config = YAML::LoadFile(stryamlpath);
  195. }
  196. catch(YAML::BadFile e)
  197. {
  198. qDebug("load error.");
  199. (void)e;
  200. return;
  201. }
  202. if(config["server"])
  203. {
  204. gstrserverip = config["server"].as<std::string>();
  205. }
  206. if(config["port"])
  207. {
  208. gstrserverport = config["port"].as<std::string>();
  209. }
  210. if(config["nodeid"])
  211. {
  212. mstrnodeid = config["nodeid"].as<std::string>();
  213. }
  214. if(config["objnodeid"])
  215. {
  216. mstrobjnodeid = config["objnodeid"].as<std::string>();
  217. }
  218. std::string strmsgname;
  219. if(config["upmessage"])
  220. {
  221. for(YAML::const_iterator it= config["upmessage"].begin(); it != config["upmessage"].end();++it)
  222. {
  223. std::string strtitle = it->first.as<std::string>();
  224. std::cout<<strtitle<<std::endl;
  225. if(config["upmessage"][strtitle]["msgname"]&&config["upmessage"][strtitle]["buffersize"]&&config["upmessage"][strtitle]["buffercount"])
  226. {
  227. iv::msgunit xmu;
  228. strmsgname = config["upmessage"][strtitle]["msgname"].as<std::string>();
  229. strncpy(xmu.mstrmsgname,strmsgname.data(),255);
  230. xmu.mnBufferSize = config["upmessage"][strtitle]["buffersize"].as<int>();
  231. xmu.mnBufferCount = config["upmessage"][strtitle]["buffercount"].as<int>();
  232. if(config["upmessage"][strtitle]["bimportant"])
  233. {
  234. std::string strimportant = config["upmessage"][strtitle]["bimportant"].as<std::string>();
  235. if(strimportant == "true")
  236. {
  237. xmu.mbImportant = true;
  238. }
  239. }
  240. if(config["upmessage"][strtitle]["keeptime"])
  241. {
  242. std::string strkeep = config["upmessage"][strtitle]["keeptime"].as<std::string>();
  243. xmu.mnkeeptime = atoi(strkeep.data());
  244. }
  245. mvectormsgunit.push_back(xmu);
  246. }
  247. }
  248. }
  249. else
  250. {
  251. }
  252. if(config["downmessage"])
  253. {
  254. std::string strnodename = "downmessage";
  255. for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it)
  256. {
  257. std::string strtitle = it->first.as<std::string>();
  258. std::cout<<strtitle<<std::endl;
  259. if(config[strnodename][strtitle]["msgname"]&&config[strnodename][strtitle]["buffersize"]&&config[strnodename][strtitle]["buffercount"])
  260. {
  261. iv::msgunit xmu;
  262. strmsgname = config[strnodename][strtitle]["msgname"].as<std::string>();
  263. strncpy(xmu.mstrmsgname,strmsgname.data(),255);
  264. xmu.mnBufferSize = config[strnodename][strtitle]["buffersize"].as<int>();
  265. xmu.mnBufferCount = config[strnodename][strtitle]["buffercount"].as<int>();
  266. mvectorctrlmsgunit.push_back(xmu);
  267. }
  268. }
  269. }
  270. else
  271. {
  272. }
  273. return;
  274. }
  275. void grpcclient::shareswapmsg(iv::cloud::cloudmsg * pxmsg)
  276. {
  277. int i;
  278. int nsize = pxmsg->xclouddata_size();
  279. for(i=0;i<nsize;i++)
  280. {
  281. int j;
  282. int nquerysize = mvectorctrlmsgunit.size();
  283. for(j=0;j<nquerysize;j++)
  284. {
  285. if(strncmp(pxmsg->xclouddata(i).msgname().data(), mvectorctrlmsgunit[j].mstrmsgname,255) == 0)
  286. {
  287. // qDebug("size is %d ",pxmsg->xclouddata(i).data().size());
  288. iv::modulecomm::ModuleSendMsg(mvectorctrlmsgunit[j].mpa,pxmsg->xclouddata(i).data().data(),pxmsg->xclouddata(i).data().size());
  289. break;
  290. }
  291. }
  292. }
  293. }
  294. void grpcclient::UpdateData(const char *strdata, const unsigned int nSize, const char *strmemname)
  295. {
  296. mmutexmsg.lock();
  297. iv::cloud::cloudunit * pcu = mcloudemsg.add_xclouddata();
  298. pcu->set_msgname(strmemname);
  299. pcu->set_data(strdata,nSize);
  300. if(mcloudemsg.xclouddata_size() == 1)
  301. {
  302. mcloudemsg.set_xtime(std::chrono::system_clock::now().time_since_epoch().count());
  303. }
  304. mmutexmsg.unlock();
  305. }