grpcclient.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. #include "grpcclient.h"
  2. grpcclient * ggrpcclient;
  3. void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname)
  4. {
  5. ggrpcclient->UpdateData(strdata,nSize,strmemname);
  6. }
  7. grpcclient::grpcclient(std::string stryamlpath)
  8. {
  9. ggrpcclient = this;
  10. dec_yaml(stryamlpath.data());
  11. int i;
  12. for(i=0;i<mvectormsgunit.size();i++)
  13. {
  14. mvectormsgunit[i].mpa = iv::modulecomm::RegisterRecv(mvectormsgunit[i].mstrmsgname,ListenData);
  15. }
  16. for(i=0;i<mvectorctrlmsgunit.size();i++)
  17. {
  18. mvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterSend(mvectorctrlmsgunit[i].mstrmsgname,mvectorctrlmsgunit[i].mnBufferSize,
  19. mvectorctrlmsgunit[i].mnBufferCount);
  20. }
  21. }
  22. qint64 grpcclient::calclatency(qint64 nnewlatency)
  23. {
  24. mvectorlatency.push_back(nnewlatency);
  25. while(mvectorlatency.size()>5)mvectorlatency.erase(mvectorlatency.begin());
  26. qint64 nlatencytotal =0;
  27. int nsize = mvectorlatency.size();
  28. int i;
  29. for(i=0;i<nsize;i++)
  30. {
  31. nlatencytotal = nlatencytotal + mvectorlatency.at(i);
  32. }
  33. if(nsize > 0)
  34. {
  35. nlatencytotal = nlatencytotal/nsize;
  36. }
  37. mnlatency = nlatencytotal;
  38. return nlatencytotal;
  39. }
  40. void grpcclient::sendcloudmsg(iv::cloud::cloudmsg &xmsg,std::shared_ptr<::grpc::ClientReaderWriter<iv::CloudSwapRequestStream, iv::CloudSwapReplyStream> > writer)
  41. {
  42. static int64_t nmsgindex = 0;
  43. nmsgindex++;
  44. int nbytesize = xmsg.ByteSize();
  45. char * strbuf = new char[nbytesize];
  46. std::shared_ptr<char> pstrbuf;
  47. pstrbuf.reset(strbuf);
  48. if(xmsg.SerializeToArray(strbuf,nbytesize))
  49. {
  50. iv::CloudSwapRequestStream request;
  51. request.set_nmsgindex(nmsgindex);
  52. request.set_nmsgtime(std::chrono::system_clock::now().time_since_epoch().count());
  53. request.set_pingavg(0);
  54. request.set_pingdev(0);
  55. request.set_pingmax(0);
  56. request.set_pingmin(0);
  57. request.set_strnodeid("111");
  58. request.set_strobjnodeid("222");
  59. request.set_xdata(strbuf,nbytesize);
  60. QTime xt;
  61. xt.start();
  62. ::grpc::WriteOptions wo;
  63. // wo.set_write_through();
  64. // wo.clear_buffer_hint();
  65. // writer->Write(request,(void * )2);
  66. // bool bsend = true;
  67. bool bsend = writer->Write(request,wo);
  68. if(bsend == false)std::cout<<"send msg. rtn is "<<bsend<<std::endl;
  69. }
  70. }
  71. void grpcclient::threadsend(std::shared_ptr<::grpc::ClientReaderWriter<iv::CloudSwapRequestStream, iv::CloudSwapReplyStream> > writer,std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime)
  72. {
  73. std::cout<<"threadsend start "<<std::endl;
  74. int ninterval = 1000;
  75. int nheartbeatinterval = 1000; //when 1000ms no data, send hartbeat interval, no message data.
  76. mninterval = ninterval;
  77. QTime xTime;
  78. xTime.start();
  79. int nlastsend = xTime.elapsed();
  80. while(*pbrun)
  81. {
  82. iv::cloud::cloudmsg xmsg;
  83. if(mcloudemsg.xclouddata_size() == 0)
  84. {
  85. if(abs(xTime.elapsed()-nlastsend)>= nheartbeatinterval)
  86. {
  87. sendcloudmsg(xmsg,writer); //send data.
  88. nlastsend = xTime.elapsed();
  89. }
  90. else
  91. {
  92. std::this_thread::sleep_for(std::chrono::milliseconds(1));
  93. }
  94. continue;
  95. }
  96. mmutexmsg.lock();
  97. xmsg.CopyFrom(mcloudemsg);
  98. mcloudemsg.clear_xclouddata();
  99. mmutexmsg.unlock();
  100. sendcloudmsg(xmsg,writer); //send data.
  101. nlastsend = xTime.elapsed();
  102. }
  103. std::cout<<"thread send end."<<std::endl;
  104. }
  105. void grpcclient::threadrecv(std::shared_ptr<::grpc::ClientReaderWriter<iv::CloudSwapRequestStream, iv::CloudSwapReplyStream> > writer, std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime)
  106. {
  107. std::cout<<"threadrecv start"<<std::endl;
  108. iv::CloudSwapReplyStream reply;
  109. while (writer->Read(&reply)) {
  110. // nfail = 0;
  111. // std::cout << "接收到回复:" << reply.remsg()<<"--\n" << std::endl;
  112. if(reply.nres() > 0)
  113. {
  114. int i;
  115. for(i=0;i<reply.xdata_size();i++)
  116. {
  117. iv::cloud::cloudmsg xmsg;
  118. if(xmsg.ParseFromArray(reply.mutable_xdata(i)->data(),reply.mutable_xdata(i)->size()))
  119. {
  120. shareswapmsg(&xmsg);
  121. }
  122. }
  123. }
  124. // std::cout<<"read data from server."<<std::endl;
  125. }
  126. std::cout<<"threadrecv end."<<std::endl;
  127. *pbrun = false;
  128. }
  129. void grpcclient::threadRPC(std::shared_ptr<qint64> pnrpctime,std::shared_ptr<bool> pbRun)
  130. {
  131. std::cout<<"threadrpc start."<<std::endl;
  132. std::string target_str = gstrserverip+":";
  133. target_str = target_str + gstrserverport ;//std::to_string()
  134. auto cargs = grpc::ChannelArguments();
  135. cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
  136. cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
  137. std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
  138. target_str, grpc::InsecureChannelCredentials(),cargs);
  139. std::unique_ptr<iv::CloudSwapStream::Stub> stub_ = iv::CloudSwapStream::NewStub(channel);
  140. int nfail = 0;
  141. while(!QThread::isInterruptionRequested())
  142. {
  143. ClientContext context ;
  144. // std::shared_ptr<ClientContext> pcontext(new ClientContext);
  145. std::shared_ptr<::grpc::ClientReaderWriter<iv::CloudSwapRequestStream, iv::CloudSwapReplyStream> > writerRead(stub_->swap(&context));
  146. // std::shared_ptr<bool> pbRun(new bool);
  147. *pbRun = true;
  148. std::shared_ptr<qint64> pntime = pnrpctime ;
  149. *pntime = QDateTime::currentMSecsSinceEpoch();
  150. std::thread * pthread = new std::thread(&grpcclient::threadsend,this,writerRead,pbRun,pntime);
  151. (void )pthread;
  152. std::thread * precvthread = new std::thread(&grpcclient::threadrecv,this,writerRead,pbRun,pntime);
  153. (void)precvthread;
  154. pthread->join();
  155. precvthread->join();
  156. // std::cout<<"threadRPC end"<<std::endl;
  157. // *pbRun = false;
  158. // while(((QDateTime::currentMSecsSinceEpoch() - *pntime)<3000)&&(*pbRun))
  159. // {
  160. // std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  161. // }
  162. // *pbRun = false;
  163. std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  164. channel = grpc::CreateCustomChannel(
  165. target_str, grpc::InsecureChannelCredentials(),cargs);
  166. stub_ = iv::CloudSwapStream::NewStub(channel);
  167. nfail++;
  168. // if(nfail > 100)std::this_thread::sleep_for(std::chrono::milliseconds(3000));
  169. // else std::this_thread::sleep_for(std::chrono::milliseconds(100));
  170. std::cout<<"reconnnect to server. nfail is "<<nfail<<std::endl;
  171. }
  172. }
  173. void grpcclient::run()
  174. {
  175. std::shared_ptr<bool> pbRun(new bool);
  176. *pbRun = true;
  177. std::shared_ptr<qint64> pntime(new qint64);
  178. *pntime = QDateTime::currentMSecsSinceEpoch();
  179. std::thread * pthread = new std::thread(&grpcclient::threadRPC,this,pntime,pbRun);
  180. return;
  181. // while(!QThread::isInterruptionRequested())
  182. // {
  183. // std::shared_ptr<bool> pbRun(new bool);
  184. // *pbRun = true;
  185. // std::shared_ptr<qint64> pntime(new qint64);
  186. // *pntime = QDateTime::currentMSecsSinceEpoch();
  187. // std::thread * pthread = new std::thread(&grpcclient::threadRPC,this,pntime,pbRun);
  188. // (void )pthread;
  189. // while(((QDateTime::currentMSecsSinceEpoch() - *pntime)<10000)&&(*pbRun))
  190. // {
  191. // std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  192. // }
  193. // std::cout<<" reconect."<<std::endl;
  194. // *pbRun = false;
  195. // }
  196. }
  197. void grpcclient::dec_yaml(const char * stryamlpath)
  198. {
  199. YAML::Node config;
  200. try
  201. {
  202. config = YAML::LoadFile(stryamlpath);
  203. }
  204. catch(YAML::BadFile e)
  205. {
  206. qDebug("load error.");
  207. return;
  208. }
  209. std::vector<std::string> vecmodulename;
  210. if(config["server"])
  211. {
  212. gstrserverip = config["server"].as<std::string>();
  213. }
  214. if(config["port"])
  215. {
  216. gstrserverport = config["port"].as<std::string>();
  217. }
  218. if(config["uploadinterval"])
  219. {
  220. gstruploadinterval = config["uploadinterval"].as<std::string>();
  221. }
  222. if(config["VIN"])
  223. {
  224. gstrVIN = config["VIN"].as<std::string>();
  225. }
  226. if(config["queryMD5"])
  227. {
  228. gstrqueryMD5 = config["queryMD5"].as<std::string>();
  229. }
  230. else
  231. {
  232. return;
  233. }
  234. if(config["ctrlMD5"])
  235. {
  236. gstrctrlMD5 = config["ctrlMD5"].as<std::string>();
  237. }
  238. std::string strmsgname;
  239. if(config["uploadmessage"])
  240. {
  241. for(YAML::const_iterator it= config["uploadmessage"].begin(); it != config["uploadmessage"].end();++it)
  242. {
  243. std::string strtitle = it->first.as<std::string>();
  244. std::cout<<strtitle<<std::endl;
  245. if(config["uploadmessage"][strtitle]["msgname"]&&config["uploadmessage"][strtitle]["buffersize"]&&config["uploadmessage"][strtitle]["buffercount"])
  246. {
  247. iv::msgunit xmu;
  248. strmsgname = config["uploadmessage"][strtitle]["msgname"].as<std::string>();
  249. strncpy(xmu.mstrmsgname,strmsgname.data(),255);
  250. xmu.mnBufferSize = config["uploadmessage"][strtitle]["buffersize"].as<int>();
  251. xmu.mnBufferCount = config["uploadmessage"][strtitle]["buffercount"].as<int>();
  252. if(config["uploadmessage"][strtitle]["bimportant"])
  253. {
  254. std::string strimportant = config["uploadmessage"][strtitle]["bimportant"].as<std::string>();
  255. if(strimportant == "true")
  256. {
  257. xmu.mbImportant = true;
  258. }
  259. }
  260. if(config["uploadmessage"][strtitle]["keeptime"])
  261. {
  262. std::string strkeep = config["uploadmessage"][strtitle]["keeptime"].as<std::string>();
  263. xmu.mnkeeptime = atoi(strkeep.data());
  264. }
  265. mvectormsgunit.push_back(xmu);
  266. }
  267. }
  268. }
  269. else
  270. {
  271. }
  272. if(!config["ctrlMD5"])
  273. {
  274. return;
  275. }
  276. if(config["ctrlmessage"])
  277. {
  278. std::string strnodename = "ctrlmessage";
  279. for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it)
  280. {
  281. std::string strtitle = it->first.as<std::string>();
  282. std::cout<<strtitle<<std::endl;
  283. if(config[strnodename][strtitle]["msgname"]&&config[strnodename][strtitle]["buffersize"]&&config[strnodename][strtitle]["buffercount"])
  284. {
  285. iv::msgunit xmu;
  286. strmsgname = config[strnodename][strtitle]["msgname"].as<std::string>();
  287. strncpy(xmu.mstrmsgname,strmsgname.data(),255);
  288. xmu.mnBufferSize = config[strnodename][strtitle]["buffersize"].as<int>();
  289. xmu.mnBufferCount = config[strnodename][strtitle]["buffercount"].as<int>();
  290. mvectorctrlmsgunit.push_back(xmu);
  291. }
  292. }
  293. }
  294. else
  295. {
  296. }
  297. return;
  298. }
  299. void grpcclient::shareswapmsg(iv::cloud::cloudmsg * pxmsg)
  300. {
  301. int i;
  302. int nsize = pxmsg->xclouddata_size();
  303. for(i=0;i<nsize;i++)
  304. {
  305. int j;
  306. int nquerysize = mvectorctrlmsgunit.size();
  307. for(j=0;j<nquerysize;j++)
  308. {
  309. if(strncmp(pxmsg->xclouddata(i).msgname().data(), mvectorctrlmsgunit[j].mstrmsgname,255) == 0)
  310. {
  311. // qDebug("size is %d ",pxmsg->xclouddata(i).data().size());
  312. iv::modulecomm::ModuleSendMsg(mvectorctrlmsgunit[j].mpa,pxmsg->xclouddata(i).data().data(),pxmsg->xclouddata(i).data().size());
  313. break;
  314. }
  315. }
  316. }
  317. }
  318. void grpcclient::UpdateData(const char *strdata, const unsigned int nSize, const char *strmemname)
  319. {
  320. mmutexmsg.lock();
  321. iv::cloud::cloudunit * pcu = mcloudemsg.add_xclouddata();
  322. pcu->set_msgname(strmemname);
  323. pcu->set_data(strdata,nSize);
  324. if(mcloudemsg.xclouddata_size() == 1)
  325. {
  326. mcloudemsg.set_xtime(std::chrono::system_clock::now().time_since_epoch().count());
  327. }
  328. mmutexmsg.unlock();
  329. }