grpcclient.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. #include "grpcclient.h"
  2. grpcclient * ggrpcclient;
  3. void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname)
  4. {
  5. ggrpcclient->UpdateData(strdata,nSize,strmemname);
  6. }
  7. grpcclient::grpcclient(std::string stryamlpath)
  8. {
  9. ggrpcclient = this;
  10. dec_yaml(stryamlpath.data());
  11. int i;
  12. for(i=0;i<mvectormsgunit.size();i++)
  13. {
  14. mvectormsgunit[i].mpa = iv::modulecomm::RegisterRecv(mvectormsgunit[i].mstrmsgname,ListenData);
  15. }
  16. for(i=0;i<mvectorctrlmsgunit.size();i++)
  17. {
  18. mvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterSend(mvectorctrlmsgunit[i].mstrmsgname,mvectorctrlmsgunit[i].mnBufferSize,
  19. mvectorctrlmsgunit[i].mnBufferCount);
  20. }
  21. }
  22. qint64 grpcclient::calclatency(qint64 nnewlatency)
  23. {
  24. mvectorlatency.push_back(nnewlatency);
  25. while(mvectorlatency.size()>5)mvectorlatency.erase(mvectorlatency.begin());
  26. qint64 nlatencytotal =0;
  27. int nsize = mvectorlatency.size();
  28. int i;
  29. for(i=0;i<nsize;i++)
  30. {
  31. nlatencytotal = nlatencytotal + mvectorlatency.at(i);
  32. }
  33. if(nsize > 0)
  34. {
  35. nlatencytotal = nlatencytotal/nsize;
  36. }
  37. mnlatency = nlatencytotal;
  38. return nlatencytotal;
  39. }
  40. void grpcclient::threadsend(std::shared_ptr<::grpc::ClientReaderWriter<iv::UploadRequestStream, iv::UploadReplyStream> > writer,std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime)
  41. {
  42. std::cout<<"threadsend start "<<std::endl;
  43. int nsize = mvectormsgunit.size();
  44. int i;
  45. int ninterval = atoi(gstruploadinterval.data());
  46. if(ninterval<=0)ninterval = 100;
  47. mninterval = ninterval;
  48. float ffraterate = 1.0f/((float)mninterval);
  49. int nrawinterval = ninterval;
  50. int nok = 0;
  51. QTime xTime;
  52. xTime.start();
  53. int nlastsend = xTime.elapsed();
  54. int nid= 0;
  55. while(*pbrun)
  56. {
  57. std::this_thread::sleep_for(std::chrono::milliseconds(1));
  58. if((xTime.elapsed()-nlastsend)<mninterval)
  59. {
  60. continue;
  61. }
  62. bool bImportant = false;
  63. int nkeeptime = 0;
  64. iv::cloud::cloudmsg xmsg;
  65. xmsg.set_xtime(QDateTime::currentMSecsSinceEpoch());
  66. gMutexMsg.lock();
  67. for(i=0;i<nsize;i++)
  68. {
  69. if(mvectormsgunit[i].mbRefresh)
  70. {
  71. mvectormsgunit[i].mbRefresh = false;
  72. if(mvectormsgunit[i].mbImportant)
  73. {
  74. bImportant = true;
  75. }
  76. if(mvectormsgunit[i].mnkeeptime > nkeeptime)
  77. {
  78. nkeeptime = mvectormsgunit[i].mnkeeptime;
  79. }
  80. iv::cloud::cloudunit xcloudunit;
  81. xcloudunit.set_msgname(mvectormsgunit[i].mstrmsgname);
  82. xcloudunit.set_data(mvectormsgunit[i].mpstrmsgdata.get(),mvectormsgunit[i].mndatasize);
  83. iv::cloud::cloudunit * pcu = xmsg.add_xclouddata();
  84. pcu->CopyFrom(xcloudunit);
  85. }
  86. }
  87. gMutexMsg.unlock();
  88. int nbytesize = xmsg.ByteSize();
  89. char * strbuf = new char[nbytesize];
  90. std::shared_ptr<char> pstrbuf;
  91. pstrbuf.reset(strbuf);
  92. if(xmsg.SerializeToArray(strbuf,nbytesize))
  93. {
  94. iv::UploadRequestStream request;
  95. qint64 time1 = QDateTime::currentMSecsSinceEpoch();
  96. request.set_id(nid);
  97. request.set_ntime(time1);
  98. request.set_strquerymd5(gstrqueryMD5);
  99. request.set_strctrlmd5(gstrctrlMD5);
  100. request.set_strvin(gstrVIN);
  101. request.set_xdata(strbuf,nbytesize);
  102. request.set_kepptime(nkeeptime);
  103. request.set_bimportant(bImportant);
  104. request.set_nlatency(mnlatency);
  105. ffraterate = 1000.0f/((float)mninterval);
  106. request.set_fframerate(ffraterate);
  107. request.set_nsendtime(time1);
  108. nid++;
  109. nlastsend = xTime.elapsed();
  110. QTime xt;
  111. xt.start();
  112. ::grpc::WriteOptions wo;
  113. // wo.set_write_through();
  114. // wo.clear_buffer_hint();
  115. // writer->Write(request,(void * )2);
  116. // bool bsend = true;
  117. bool bsend = writer->Write(request,wo);
  118. *nlastreftime = QDateTime::currentMSecsSinceEpoch();
  119. // if(xt.elapsed()>10)
  120. // {
  121. // nok = 0;
  122. // if(ninterval < 1000)ninterval = ninterval * 11/10;
  123. // mninterval = ninterval;
  124. // ffraterate = 1.0f/((float)mninterval);
  125. // qDebug("send ela is %d ninterval is %d",xt.elapsed(),ninterval);
  126. // }
  127. // else
  128. // {
  129. // nok++;
  130. // if((ninterval > nrawinterval)&&(nok>10))
  131. // {
  132. // nok = 0;
  133. // ninterval = ninterval*10/11;
  134. // mninterval = ninterval;
  135. // ffraterate = 1.0f/((float)mninterval);
  136. // std::cout<<"ninterval is "<<ninterval<<std::endl;
  137. // }
  138. // }
  139. if(bsend == false)std::cout<<"send msg. rtn is "<<bsend<<std::endl;
  140. }
  141. }
  142. std::cout<<"thread send end."<<std::endl;
  143. }
  144. void grpcclient::threadrecv(std::shared_ptr<::grpc::ClientReaderWriter<iv::UploadRequestStream, iv::UploadReplyStream> > writer, std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime)
  145. {
  146. std::cout<<"threadrecv start"<<std::endl;
  147. iv::UploadReplyStream reply;
  148. while (writer->Read(&reply)) {
  149. *nlastreftime = QDateTime::currentMSecsSinceEpoch();
  150. qint64 nlaten = QDateTime::currentMSecsSinceEpoch() - reply.nreqsendtime();
  151. if(reply.nreqsendtime() == 0)nlaten = 0;
  152. else
  153. {
  154. nlaten = nlaten - reply.npausetime();
  155. }
  156. calclatency(nlaten);
  157. if(reply.framerate() >0.001)
  158. {
  159. mninterval = 1000.0/reply.framerate();
  160. }
  161. qDebug("latency is %ld",nlaten);
  162. // nfail = 0;
  163. // std::cout << "接收到回复:" << reply.remsg()<<"--\n" << std::endl;
  164. if(reply.nres() == 1)
  165. {
  166. iv::cloud::cloudmsg xmsg;
  167. if(xmsg.ParseFromArray(reply.xdata().data(),reply.xdata().size()))
  168. {
  169. sharectrlmsg(&xmsg);
  170. }
  171. }
  172. // std::cout<<"read data from server."<<std::endl;
  173. }
  174. std::cout<<"threadrecv end."<<std::endl;
  175. *pbrun = false;
  176. }
  177. void grpcclient::threadRPC(std::shared_ptr<qint64> pnrpctime,std::shared_ptr<bool> pbRun)
  178. {
  179. std::cout<<"threadrpc start."<<std::endl;
  180. std::string target_str = gstrserverip+":";
  181. target_str = target_str + gstrserverport ;//std::to_string()
  182. auto cargs = grpc::ChannelArguments();
  183. cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
  184. cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
  185. std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
  186. target_str, grpc::InsecureChannelCredentials(),cargs);
  187. std::unique_ptr<iv::UploadStream::Stub> stub_ = iv::UploadStream::NewStub(channel);
  188. int nfail = 0;
  189. while(!QThread::isInterruptionRequested())
  190. {
  191. ClientContext context ;
  192. // std::shared_ptr<ClientContext> pcontext(new ClientContext);
  193. std::shared_ptr<::grpc::ClientReaderWriter<iv::UploadRequestStream, iv::UploadReplyStream> > writerRead(stub_->upload(&context));
  194. // std::shared_ptr<bool> pbRun(new bool);
  195. *pbRun = true;
  196. std::shared_ptr<qint64> pntime = pnrpctime ;
  197. *pntime = QDateTime::currentMSecsSinceEpoch();
  198. std::thread * pthread = new std::thread(&grpcclient::threadsend,this,writerRead,pbRun,pntime);
  199. (void )pthread;
  200. std::thread * precvthread = new std::thread(&grpcclient::threadrecv,this,writerRead,pbRun,pntime);
  201. (void)precvthread;
  202. pthread->join();
  203. precvthread->join();
  204. // std::cout<<"threadRPC end"<<std::endl;
  205. // *pbRun = false;
  206. // while(((QDateTime::currentMSecsSinceEpoch() - *pntime)<3000)&&(*pbRun))
  207. // {
  208. // std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  209. // }
  210. // *pbRun = false;
  211. std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  212. channel = grpc::CreateCustomChannel(
  213. target_str, grpc::InsecureChannelCredentials(),cargs);
  214. stub_ = iv::UploadStream::NewStub(channel);
  215. nfail++;
  216. // if(nfail > 100)std::this_thread::sleep_for(std::chrono::milliseconds(3000));
  217. // else std::this_thread::sleep_for(std::chrono::milliseconds(100));
  218. std::cout<<"reconnnect to server. nfail is "<<nfail<<std::endl;
  219. }
  220. }
  221. void grpcclient::run()
  222. {
  223. std::shared_ptr<bool> pbRun(new bool);
  224. *pbRun = true;
  225. std::shared_ptr<qint64> pntime(new qint64);
  226. *pntime = QDateTime::currentMSecsSinceEpoch();
  227. std::thread * pthread = new std::thread(&grpcclient::threadRPC,this,pntime,pbRun);
  228. return;
  229. // while(!QThread::isInterruptionRequested())
  230. // {
  231. // std::shared_ptr<bool> pbRun(new bool);
  232. // *pbRun = true;
  233. // std::shared_ptr<qint64> pntime(new qint64);
  234. // *pntime = QDateTime::currentMSecsSinceEpoch();
  235. // std::thread * pthread = new std::thread(&grpcclient::threadRPC,this,pntime,pbRun);
  236. // (void )pthread;
  237. // while(((QDateTime::currentMSecsSinceEpoch() - *pntime)<10000)&&(*pbRun))
  238. // {
  239. // std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  240. // }
  241. // std::cout<<" reconect."<<std::endl;
  242. // *pbRun = false;
  243. // }
  244. }
  245. void grpcclient::dec_yaml(const char * stryamlpath)
  246. {
  247. YAML::Node config;
  248. try
  249. {
  250. config = YAML::LoadFile(stryamlpath);
  251. }
  252. catch(YAML::BadFile e)
  253. {
  254. qDebug("load error.");
  255. return;
  256. }
  257. std::vector<std::string> vecmodulename;
  258. if(config["server"])
  259. {
  260. gstrserverip = config["server"].as<std::string>();
  261. }
  262. if(config["port"])
  263. {
  264. gstrserverport = config["port"].as<std::string>();
  265. }
  266. if(config["uploadinterval"])
  267. {
  268. gstruploadinterval = config["uploadinterval"].as<std::string>();
  269. }
  270. if(config["VIN"])
  271. {
  272. gstrVIN = config["VIN"].as<std::string>();
  273. }
  274. if(config["queryMD5"])
  275. {
  276. gstrqueryMD5 = config["queryMD5"].as<std::string>();
  277. }
  278. else
  279. {
  280. return;
  281. }
  282. if(config["ctrlMD5"])
  283. {
  284. gstrctrlMD5 = config["ctrlMD5"].as<std::string>();
  285. }
  286. std::string strmsgname;
  287. if(config["uploadmessage"])
  288. {
  289. for(YAML::const_iterator it= config["uploadmessage"].begin(); it != config["uploadmessage"].end();++it)
  290. {
  291. std::string strtitle = it->first.as<std::string>();
  292. std::cout<<strtitle<<std::endl;
  293. if(config["uploadmessage"][strtitle]["msgname"]&&config["uploadmessage"][strtitle]["buffersize"]&&config["uploadmessage"][strtitle]["buffercount"])
  294. {
  295. iv::msgunit xmu;
  296. strmsgname = config["uploadmessage"][strtitle]["msgname"].as<std::string>();
  297. strncpy(xmu.mstrmsgname,strmsgname.data(),255);
  298. xmu.mnBufferSize = config["uploadmessage"][strtitle]["buffersize"].as<int>();
  299. xmu.mnBufferCount = config["uploadmessage"][strtitle]["buffercount"].as<int>();
  300. if(config["uploadmessage"][strtitle]["bimportant"])
  301. {
  302. std::string strimportant = config["uploadmessage"][strtitle]["bimportant"].as<std::string>();
  303. if(strimportant == "true")
  304. {
  305. xmu.mbImportant = true;
  306. }
  307. }
  308. if(config["uploadmessage"][strtitle]["keeptime"])
  309. {
  310. std::string strkeep = config["uploadmessage"][strtitle]["keeptime"].as<std::string>();
  311. xmu.mnkeeptime = atoi(strkeep.data());
  312. }
  313. mvectormsgunit.push_back(xmu);
  314. }
  315. }
  316. }
  317. else
  318. {
  319. }
  320. if(!config["ctrlMD5"])
  321. {
  322. return;
  323. }
  324. if(config["ctrlmessage"])
  325. {
  326. std::string strnodename = "ctrlmessage";
  327. for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it)
  328. {
  329. std::string strtitle = it->first.as<std::string>();
  330. std::cout<<strtitle<<std::endl;
  331. if(config[strnodename][strtitle]["msgname"]&&config[strnodename][strtitle]["buffersize"]&&config[strnodename][strtitle]["buffercount"])
  332. {
  333. iv::msgunit xmu;
  334. strmsgname = config[strnodename][strtitle]["msgname"].as<std::string>();
  335. strncpy(xmu.mstrmsgname,strmsgname.data(),255);
  336. xmu.mnBufferSize = config[strnodename][strtitle]["buffersize"].as<int>();
  337. xmu.mnBufferCount = config[strnodename][strtitle]["buffercount"].as<int>();
  338. mvectorctrlmsgunit.push_back(xmu);
  339. }
  340. }
  341. }
  342. else
  343. {
  344. }
  345. return;
  346. }
  347. void grpcclient::sharectrlmsg(iv::cloud::cloudmsg * pxmsg)
  348. {
  349. int i;
  350. int nsize = pxmsg->xclouddata_size();
  351. for(i=0;i<nsize;i++)
  352. {
  353. int j;
  354. int nquerysize = mvectorctrlmsgunit.size();
  355. for(j=0;j<nquerysize;j++)
  356. {
  357. if(strncmp(pxmsg->xclouddata(i).msgname().data(), mvectorctrlmsgunit[j].mstrmsgname,255) == 0)
  358. {
  359. // qDebug("size is %d ",pxmsg->xclouddata(i).data().size());
  360. iv::modulecomm::ModuleSendMsg(mvectorctrlmsgunit[j].mpa,pxmsg->xclouddata(i).data().data(),pxmsg->xclouddata(i).data().size());
  361. break;
  362. }
  363. }
  364. }
  365. }
  366. void grpcclient::UpdateData(const char *strdata, const unsigned int nSize, const char *strmemname)
  367. {
  368. int nsize = mvectormsgunit.size();
  369. int i;
  370. for(i=0;i<nsize;i++)
  371. {
  372. if(strncmp(strmemname,mvectormsgunit[i].mstrmsgname,255) == 0)
  373. {
  374. gMutexMsg.lock();
  375. char * strtem = new char[nSize];
  376. memcpy(strtem,strdata,nSize);
  377. mvectormsgunit[i].mpstrmsgdata.reset(strtem);
  378. mvectormsgunit[i].mndatasize = nSize;
  379. mvectormsgunit[i].mbRefresh = true;
  380. gMutexMsg.unlock();
  381. break;
  382. }
  383. }
  384. }