grpcclient.cpp 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707
  1. #include "grpcclient.h"
  2. grpcclient * ggrpcclient;
  3. void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname)
  4. {
  5. ggrpcclient->UpdateData(strdata,nSize,strmemname);
  6. }
  7. //#define TESTUP
  8. void ListenPicData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname)
  9. {
  10. #ifndef TESTUP
  11. ggrpcclient->UpdatePicData(strdata,nSize,strmemname);
  12. #endif
  13. #ifdef TESTUP
  14. ggrpcclient->UpdatePicData(strdata,nSize,"h264front");
  15. ggrpcclient->UpdatePicData(strdata,nSize,"h264rear");
  16. ggrpcclient->UpdatePicData(strdata,nSize,"h264left");
  17. ggrpcclient->UpdatePicData(strdata,nSize,"h264right");
  18. #endif
  19. }
  20. grpcclient::grpcclient(std::string stryamlpath)
  21. {
  22. ggrpcclient = this;
  23. dec_yaml(stryamlpath.data());
  24. mstrpicmsgname[0] = "h264front";
  25. mstrpicmsgname[1] = "h264rear";
  26. mstrpicmsgname[2] = "h264left";
  27. mstrpicmsgname[3] = "h264right";
  28. unsigned int i;
  29. if(mbFrameUpdate)
  30. {
  31. for(i=0;i<mvectormsgunit.size();i++)
  32. {
  33. mvectormsgunit[i].mpa = iv::modulecomm::RegisterRecv(mvectormsgunit[i].mstrmsgname,ListenData);
  34. }
  35. }
  36. for(i=0;i<mvectorctrlmsgunit.size();i++)
  37. {
  38. mvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterSend(mvectorctrlmsgunit[i].mstrmsgname,mvectorctrlmsgunit[i].mnBufferSize,
  39. mvectorctrlmsgunit[i].mnBufferCount);
  40. }
  41. for(i=0;i<NUM_CAM;i++)
  42. {
  43. mpaPic[i] = iv::modulecomm::RegisterRecv(mstrpicmsgname[i].data(),ListenPicData);
  44. }
  45. if(mbFrameUpdate)
  46. {
  47. for(i=0;i<NUM_CAM;i++)
  48. {
  49. unsigned int j;
  50. for(j=0;j<NUM_THREAD_PERCAM;j++)
  51. {
  52. mpThread[i*NUM_THREAD_PERCAM + j] = new std::thread(&grpcclient::threadpicupload,this,i);
  53. }
  54. }
  55. }
  56. for(i=0;i<NUM_CAM;i++)
  57. {
  58. mpicbuf[i].mnSkipBase = mnskip;
  59. }
  60. }
  61. grpcclient::~grpcclient()
  62. {
  63. std::cout<<" enter ~grpcclient"<<std::endl;
  64. mbPicUpload = false;
  65. requestInterruption();
  66. while(this->isFinished() == false)
  67. {
  68. }
  69. std::cout<<"now join grpcclient thread"<<std::endl;
  70. unsigned int i;
  71. for(i=0;i<NUM_CAM;i++)
  72. {
  73. unsigned int j;
  74. for(j=0;j<NUM_THREAD_PERCAM;j++)
  75. {
  76. mpThread[i*NUM_THREAD_PERCAM + j]->join();
  77. }
  78. }
  79. for(i=0;i<mvectorctrlmsgunit.size();i++)
  80. {
  81. iv::modulecomm::Unregister(mvectorctrlmsgunit[i].mpa);
  82. }
  83. for(i=0;i<mvectormsgunit.size();i++)
  84. {
  85. iv::modulecomm::Unregister(mvectormsgunit[i].mpa);
  86. }
  87. std::cout<<"complete ~grpcclient"<<std::endl;
  88. }
  89. void grpcclient::run()
  90. {
  91. int nsize = mvectormsgunit.size();
  92. int i;
  93. int ninterval = atoi(gstruploadinterval.data());
  94. if(ninterval<=0)ninterval = 100;
  95. QTime xTime;
  96. xTime.start();
  97. int nlastsend = xTime.elapsed();
  98. std::string target_str = gstrserverip+":";
  99. target_str = target_str + gstrserverport ;//std::to_string()
  100. std::cout<<" server : "<<target_str<<std::endl;
  101. auto cargs = grpc::ChannelArguments();
  102. cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
  103. cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
  104. std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
  105. target_str, grpc::InsecureChannelCredentials(),cargs);
  106. std::unique_ptr<iv::UploadThread::Stub> stub_ = iv::UploadThread::NewStub(channel);
  107. iv::UploadRequestThread request;
  108. int nid = 0;
  109. // Container for the data we expect from the server.
  110. iv::UploadReplyThread reply;
  111. gpr_timespec timespec;
  112. timespec.tv_sec = 30;//设置阻塞时间为2秒
  113. timespec.tv_nsec = 0;
  114. timespec.clock_type = GPR_TIMESPAN;
  115. // ClientContext context;
  116. std::vector<qint64> xvectorlatency;
  117. while(!QThread::isInterruptionRequested())
  118. {
  119. std::this_thread::sleep_for(std::chrono::milliseconds(1));
  120. if((xTime.elapsed()-nlastsend)<ninterval)
  121. {
  122. continue;
  123. }
  124. bool bImportant = false;
  125. int nkeeptime = 0;
  126. iv::cloud::cloudmsg xmsg;
  127. xmsg.set_xtime(QDateTime::currentMSecsSinceEpoch());
  128. gMutexMsg.lock();
  129. for(i=0;i<nsize;i++)
  130. {
  131. if(mvectormsgunit[i].mbRefresh)
  132. {
  133. mvectormsgunit[i].mbRefresh = false;
  134. if(mvectormsgunit[i].mbImportant)
  135. {
  136. bImportant = true;
  137. }
  138. if(mvectormsgunit[i].mnkeeptime > nkeeptime)
  139. {
  140. nkeeptime = mvectormsgunit[i].mnkeeptime;
  141. }
  142. iv::cloud::cloudunit xcloudunit;
  143. xcloudunit.set_msgname(mvectormsgunit[i].mstrmsgname);
  144. xcloudunit.set_data(mvectormsgunit[i].mpstrmsgdata.get(),mvectormsgunit[i].mndatasize);
  145. iv::cloud::cloudunit * pcu = xmsg.add_xclouddata();
  146. pcu->CopyFrom(xcloudunit);
  147. }
  148. }
  149. gMutexMsg.unlock();
  150. int nbytesize = xmsg.ByteSize();
  151. char * strbuf = new char[nbytesize];
  152. std::shared_ptr<char> pstrbuf;
  153. pstrbuf.reset(strbuf);
  154. if(xmsg.SerializeToArray(strbuf,nbytesize))
  155. {
  156. ClientContext context ;
  157. context.set_deadline(timespec);
  158. qint64 time1 = QDateTime::currentMSecsSinceEpoch();
  159. request.set_id(nid);
  160. request.set_ntime(time1);
  161. request.set_strquerymd5(gstrqueryMD5);
  162. request.set_strctrlmd5(gstrctrlMD5);
  163. request.set_strvin(gstrVIN);
  164. request.set_xdata(strbuf,nbytesize);
  165. request.set_kepptime(nkeeptime);
  166. request.set_bimportant(bImportant);
  167. request.set_nsendtime(QDateTime::currentMSecsSinceEpoch());
  168. request.set_nlatency(CalcLateny(xvectorlatency));
  169. nid++;
  170. nlastsend = xTime.elapsed();
  171. // The actual RPC.
  172. Status status = stub_->uploaddata(&context, request, &reply);
  173. if (status.ok()) {
  174. // std::cout<<" data size is "<<nbytesize<<std::endl;
  175. // std::cout<<nid<<" upload successfully"<<std::endl;
  176. xvectorlatency.push_back((QDateTime::currentMSecsSinceEpoch() - reply.nreqsendtime()));
  177. while(xvectorlatency.size()>10)xvectorlatency.erase(xvectorlatency.begin());
  178. if(reply.nres() == 1)
  179. {
  180. iv::cloud::cloudmsg xmsg;
  181. if(xmsg.ParseFromArray(reply.xdata().data(),reply.xdata().size()))
  182. {
  183. sharectrlmsg(&xmsg);
  184. }
  185. }
  186. } else {
  187. std::cout << status.error_code() << ": " << status.error_message()
  188. << std::endl;
  189. std::cout<<"RPC failed"<<std::endl;
  190. if(status.error_code() == 4)
  191. {
  192. std::cout<<" RPC Exceed Time, Create New stub_"<<std::endl;
  193. channel = grpc::CreateCustomChannel(
  194. target_str, grpc::InsecureChannelCredentials(),cargs);
  195. stub_ = iv::UploadThread::NewStub(channel);
  196. }
  197. std::this_thread::sleep_for(std::chrono::milliseconds(300));
  198. }
  199. }
  200. }
  201. std::cout<<" grpcclient:run complete."<<std::endl;
  202. }
  203. void grpcclient::dec_yaml(const char * stryamlpath)
  204. {
  205. YAML::Node config;
  206. try
  207. {
  208. config = YAML::LoadFile(stryamlpath);
  209. }
  210. catch(YAML::BadFile e)
  211. {
  212. qDebug("load error.");
  213. return;
  214. }
  215. std::vector<std::string> vecmodulename;
  216. if(config["server"])
  217. {
  218. gstrserverip = config["server"].as<std::string>();
  219. }
  220. std::cout<<" server ip: "<<gstrserverip<<std::endl;
  221. if(config["port"])
  222. {
  223. gstrserverport = config["port"].as<std::string>();
  224. }
  225. if(config["uploadinterval"])
  226. {
  227. gstruploadinterval = config["uploadinterval"].as<std::string>();
  228. }
  229. if(config["skip"])
  230. {
  231. std::string strskip = config["skip"].as<std::string>();
  232. mnskip = atoi(strskip.data());
  233. if(mnskip<1)mnskip = 1;
  234. }
  235. if(config["VIN"])
  236. {
  237. gstrVIN = config["VIN"].as<std::string>();
  238. }
  239. if(config["queryMD5"])
  240. {
  241. gstrqueryMD5 = config["queryMD5"].as<std::string>();
  242. }
  243. else
  244. {
  245. return;
  246. }
  247. if(config["ctrlMD5"])
  248. {
  249. gstrctrlMD5 = config["ctrlMD5"].as<std::string>();
  250. }
  251. std::string strmsgname;
  252. if(config["uploadmessage"])
  253. {
  254. for(YAML::const_iterator it= config["uploadmessage"].begin(); it != config["uploadmessage"].end();++it)
  255. {
  256. std::string strtitle = it->first.as<std::string>();
  257. std::cout<<strtitle<<std::endl;
  258. if(config["uploadmessage"][strtitle]["msgname"]&&config["uploadmessage"][strtitle]["buffersize"]&&config["uploadmessage"][strtitle]["buffercount"])
  259. {
  260. iv::msgunit xmu;
  261. strmsgname = config["uploadmessage"][strtitle]["msgname"].as<std::string>();
  262. strncpy(xmu.mstrmsgname,strmsgname.data(),255);
  263. xmu.mnBufferSize = config["uploadmessage"][strtitle]["buffersize"].as<int>();
  264. xmu.mnBufferCount = config["uploadmessage"][strtitle]["buffercount"].as<int>();
  265. if(config["uploadmessage"][strtitle]["bimportant"])
  266. {
  267. std::string strimportant = config["uploadmessage"][strtitle]["bimportant"].as<std::string>();
  268. if(strimportant == "true")
  269. {
  270. xmu.mbImportant = true;
  271. }
  272. }
  273. if(config["uploadmessage"][strtitle]["keeptime"])
  274. {
  275. std::string strkeep = config["uploadmessage"][strtitle]["keeptime"].as<std::string>();
  276. xmu.mnkeeptime = atoi(strkeep.data());
  277. }
  278. mvectormsgunit.push_back(xmu);
  279. }
  280. }
  281. }
  282. else
  283. {
  284. }
  285. if(!config["ctrlMD5"])
  286. {
  287. return;
  288. }
  289. if(config["ctrlmessage"])
  290. {
  291. std::string strnodename = "ctrlmessage";
  292. for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it)
  293. {
  294. std::string strtitle = it->first.as<std::string>();
  295. std::cout<<strtitle<<std::endl;
  296. if(config[strnodename][strtitle]["msgname"]&&config[strnodename][strtitle]["buffersize"]&&config[strnodename][strtitle]["buffercount"])
  297. {
  298. iv::msgunit xmu;
  299. strmsgname = config[strnodename][strtitle]["msgname"].as<std::string>();
  300. strncpy(xmu.mstrmsgname,strmsgname.data(),255);
  301. xmu.mnBufferSize = config[strnodename][strtitle]["buffersize"].as<int>();
  302. xmu.mnBufferCount = config[strnodename][strtitle]["buffercount"].as<int>();
  303. mvectorctrlmsgunit.push_back(xmu);
  304. }
  305. }
  306. }
  307. else
  308. {
  309. }
  310. return;
  311. }
  312. void grpcclient::sharectrlmsg(iv::cloud::cloudmsg * pxmsg)
  313. {
  314. int i;
  315. int nsize = pxmsg->xclouddata_size();
  316. for(i=0;i<nsize;i++)
  317. {
  318. int j;
  319. int nquerysize = mvectorctrlmsgunit.size();
  320. for(j=0;j<nquerysize;j++)
  321. {
  322. if(strncmp(pxmsg->xclouddata(i).msgname().data(), mvectorctrlmsgunit[j].mstrmsgname,255) == 0)
  323. {
  324. // qDebug("size is %d ",pxmsg->xclouddata(i).data().size());
  325. iv::modulecomm::ModuleSendMsg(mvectorctrlmsgunit[j].mpa,pxmsg->xclouddata(i).data().data(),pxmsg->xclouddata(i).data().size());
  326. break;
  327. }
  328. }
  329. }
  330. }
  331. void grpcclient::UpdateData(const char *strdata, const unsigned int nSize, const char *strmemname)
  332. {
  333. int nsize = mvectormsgunit.size();
  334. int i;
  335. for(i=0;i<nsize;i++)
  336. {
  337. if(strncmp(strmemname,mvectormsgunit[i].mstrmsgname,255) == 0)
  338. {
  339. gMutexMsg.lock();
  340. char * strtem = new char[nSize];
  341. memcpy(strtem,strdata,nSize);
  342. mvectormsgunit[i].mpstrmsgdata.reset(strtem);
  343. mvectormsgunit[i].mndatasize = nSize;
  344. mvectormsgunit[i].mbRefresh = true;
  345. gMutexMsg.unlock();
  346. break;
  347. }
  348. }
  349. }
  350. void grpcclient::UpdatePicData(const char *strdata, const unsigned int nSize, const char *strmemname)
  351. {
  352. int npos = -1;
  353. unsigned int i;
  354. for(i=0;i<NUM_CAM;i++)
  355. {
  356. if(strncmp(strmemname,mstrpicmsgname[i].data(),255) == 0)
  357. {
  358. npos = i;
  359. break;
  360. }
  361. }
  362. if(npos<0)
  363. {
  364. std::cout<<"grpcclient::UpdatePicData not found pic. msg name is "<<strmemname<<std::endl;
  365. return;
  366. }
  367. if(npos>= NUM_CAM)
  368. {
  369. std::cout<<"Camera count is "<<NUM_CAM<<" NOW camear is "<<npos<<std::endl;
  370. return;
  371. }
  372. if(nSize<5)return;
  373. iv::h264frame xframe;
  374. if((strdata[4] == 0x27)||(strdata[4] == 0x47)||(strdata[4] == 0x67))
  375. {
  376. xframe.mbIframe = true;
  377. }
  378. else
  379. {
  380. xframe.mbIframe = false;
  381. }
  382. if(mpicbuf[npos].mbRecvIFrame == false)
  383. {
  384. if(xframe.mbIframe == false)
  385. {
  386. return;
  387. }
  388. else
  389. {
  390. mpicbuf[npos].mbRecvIFrame = true;
  391. }
  392. }
  393. xframe.mpstrframedata = std::shared_ptr<char>(new char[nSize]);
  394. xframe.mDataSize = nSize;
  395. memcpy(xframe.mpstrframedata.get(),strdata,nSize);
  396. iv::threadpicunit * ppicbuf = &mpicbuf[npos];
  397. ppicbuf->mMutex.lock();
  398. ppicbuf->mnMsgTime = QDateTime::currentMSecsSinceEpoch();
  399. ppicbuf->mbRefresh = true;
  400. if(ppicbuf->mvectorframe.size()>=NUM_FRAMEBUFFSIZE)
  401. {
  402. while((ppicbuf->mvectorframe.size()>=NUM_FRAMEBUFFSIZE)||((ppicbuf->mvectorframe.size()>0)&&(ppicbuf->mvectorframe.size()<NUM_FRAMEBUFFSIZE)&&(ppicbuf->mvectorframe[0].mbIframe == false)))
  403. {
  404. ppicbuf->mvectorframe.erase(ppicbuf->mvectorframe.begin()+0);
  405. }
  406. if(ppicbuf->mvectorframe.size() == 0)
  407. {
  408. std::cout<<" Reset SPS Iframe Mark."<<std::endl;
  409. ppicbuf->mbRecvIFrame = false;
  410. }
  411. }
  412. ppicbuf->mvectorframe.push_back(xframe);
  413. // mpicbuf[npos].mpstrmsgdata = std::shared_ptr<char>(new char[nSize]);
  414. // mpicbuf[npos].mDataSize = nSize;
  415. // memcpy(mpicbuf[npos].mpstrmsgdata.get(),strdata,nSize);
  416. ppicbuf->mMutex.unlock();
  417. ppicbuf->mwc.wakeAll();
  418. }
  419. void grpcclient::threadpicupload(int nCamPos)
  420. {
  421. std::cout<<"thread cam "<<nCamPos<<"run"<<std::endl;
  422. int nsize = mvectormsgunit.size();
  423. int i;
  424. int ninterval = atoi(gstruploadinterval.data());
  425. if(ninterval<=0)ninterval = 100;
  426. QTime xTime;
  427. xTime.start();
  428. int nlastsend = xTime.elapsed();
  429. std::string target_str = gstrserverip+":";
  430. target_str = target_str + gstrserverport ;//std::to_string()
  431. auto cargs = grpc::ChannelArguments();
  432. cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
  433. cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
  434. std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
  435. target_str, grpc::InsecureChannelCredentials(),cargs);
  436. std::unique_ptr<iv::UploadThread::Stub> stub_ = iv::UploadThread::NewStub(channel);
  437. iv::PicUpRequestThread request;
  438. int nid = 0;
  439. // Container for the data we expect from the server.
  440. iv::PicUpReplyThread reply;
  441. gpr_timespec timespec;
  442. timespec.tv_sec = 30;//设置阻塞时间为2秒
  443. timespec.tv_nsec = 0;
  444. timespec.clock_type = GPR_TIMESPAN;
  445. // ClientContext context;
  446. while(mbPicUpload)
  447. {
  448. std::shared_ptr<char> pstr_ptr;
  449. if((nCamPos<0)||(nCamPos >= NUM_CAM))
  450. {
  451. std::cout<<"Cam Pos Error. "<<"Pos: "<<nCamPos<<" TOTAL:"<<NUM_CAM<<std::endl;
  452. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  453. continue;
  454. }
  455. bool bUpdate = false;
  456. qint64 nMsgTime = 0;
  457. int nSize = 0;
  458. qint64 npiclatency;
  459. int nSkipBase = 1;
  460. int nCount;
  461. if(mpicbuf[nCamPos].mvectorframe.size() == 0)
  462. {
  463. mpicbuf[nCamPos].mWaitMutex.lock();
  464. mpicbuf[nCamPos].mwc.wait(&mpicbuf[nCamPos].mWaitMutex,100);
  465. mpicbuf[nCamPos].mWaitMutex.unlock();
  466. }
  467. mpicbuf[nCamPos].mMutex.lock();
  468. // bUpdate = mpicbuf[nCamPos].mbRefresh;
  469. if(mpicbuf[nCamPos].mvectorframe.size() > 0)bUpdate = true;
  470. if(bUpdate == true)
  471. {
  472. nMsgTime = mpicbuf[nCamPos].mnMsgTime;
  473. mpicbuf[nCamPos].mbRefresh = false;
  474. const int npacmax = 10;
  475. int npaccount = mpicbuf[nCamPos].mvectorframe.size();
  476. if(npaccount > npacmax)npaccount = npacmax;
  477. int nalldatasize = 0;
  478. int j;
  479. for(j=0;j<npaccount;j++)
  480. {
  481. nalldatasize = nalldatasize + mpicbuf[nCamPos].mvectorframe[j].mDataSize;
  482. }
  483. int nsendpacsize = (npaccount)*sizeof(int) + nalldatasize;
  484. pstr_ptr = std::shared_ptr<char>(new char[nsendpacsize]);
  485. char * pstrvalue = (char * )pstr_ptr.get();
  486. int npos = 0;
  487. for(j=0;j<npaccount;j++)
  488. {
  489. int * psize = (int * )(pstrvalue + npos);
  490. *psize = mpicbuf[nCamPos].mvectorframe[j].mDataSize;
  491. npos = npos + sizeof(int);
  492. memcpy(pstrvalue+ npos,mpicbuf[nCamPos].mvectorframe[j].mpstrframedata.get(),mpicbuf[nCamPos].mvectorframe[j].mDataSize);
  493. npos = npos + mpicbuf[nCamPos].mvectorframe[j].mDataSize;
  494. }
  495. std::cout<<"pac count "<<npaccount<<std::endl;
  496. for(j=0;j<npaccount;j++)mpicbuf[nCamPos].mvectorframe.erase(mpicbuf[nCamPos].mvectorframe.begin());
  497. nSize = nsendpacsize;
  498. // pstr_ptr = mpicbuf[nCamPos].mvectorframe[0].mpstrframedata;
  499. // nSize = mpicbuf[nCamPos].mvectorframe[0].mDataSize;
  500. // mpicbuf[nCamPos].mvectorframe.erase(mpicbuf[nCamPos].mvectorframe.begin());
  501. // pstr_ptr = mpicbuf[nCamPos].mpstrmsgdata;
  502. // nSize = mpicbuf[nCamPos].mDataSize;
  503. npiclatency = CalcLateny(mpicbuf[nCamPos].mvectorlatency);
  504. nSkipBase = mpicbuf[nCamPos].mnSkipBase;
  505. nCount = mpicbuf[nCamPos].mnCount;
  506. mpicbuf[nCamPos].mnCount++;
  507. // if(npiclatency > 500)
  508. // {
  509. // if(mpicbuf[nCamPos].mnSkipBase<30)mpicbuf[nCamPos].mnSkipBase++;
  510. // }
  511. // else
  512. // {
  513. // if(npiclatency<300)
  514. // if(mpicbuf[nCamPos].mnSkipBase > mpicbuf[nCamPos].mnDefSkipBase)mpicbuf[nCamPos].mnSkipBase--;
  515. // }
  516. std::cout<<"upload "<<nMsgTime<<" latency: "<<npiclatency<<" skip param: "<<nSkipBase<<std::endl;
  517. }
  518. mpicbuf[nCamPos].mMutex.unlock();
  519. // if(bUpdate == false)
  520. // {
  521. // // std::this_thread::sleep_for(std::chrono::milliseconds(10));
  522. // continue;
  523. // }
  524. // if(nCount%nSkipBase != 0)
  525. // {
  526. // continue;
  527. // }
  528. ClientContext context ;
  529. context.set_deadline(timespec);
  530. qint64 time1 = QDateTime::currentMSecsSinceEpoch();
  531. request.set_npictime(nMsgTime);
  532. request.set_ncampos(nCamPos);
  533. request.set_strvin(gstrVIN);
  534. request.set_xdata(pstr_ptr.get(),nSize);
  535. request.set_nlatency(npiclatency);
  536. nid++;
  537. nlastsend = xTime.elapsed();
  538. // The actual RPC.
  539. Status status = stub_->uploadpic(&context, request, &reply);
  540. if (status.ok()) {
  541. qint64 nlaten = QDateTime::currentMSecsSinceEpoch() - time1;
  542. mpicbuf[nCamPos].mMutex.lock();
  543. mpicbuf[nCamPos].mvectorlatency.push_back(nlaten);
  544. while(mpicbuf[nCamPos].mvectorlatency.size()>10)mpicbuf[nCamPos].mvectorlatency.erase(mpicbuf[nCamPos].mvectorlatency.begin());
  545. mpicbuf[nCamPos].mMutex.unlock();
  546. if(reply.nres() == 1)
  547. {
  548. // iv::cloud::cloudmsg xmsg;
  549. // if(xmsg.ParseFromArray(reply.xdata().data(),reply.xdata().size()))
  550. // {
  551. // sharectrlmsg(&xmsg);
  552. // }
  553. }
  554. } else {
  555. std::cout << status.error_code() << ": " << status.error_message()
  556. << std::endl;
  557. std::cout<<"RPC failed"<<std::endl;
  558. if(status.error_code() == 4)
  559. {
  560. std::cout<<nCamPos<<" RPC Exceed Time, Create New stub_"<<std::endl;
  561. channel = grpc::CreateCustomChannel(
  562. target_str, grpc::InsecureChannelCredentials(),cargs);
  563. stub_ = iv::UploadThread::NewStub(channel);
  564. }
  565. std::this_thread::sleep_for(std::chrono::milliseconds(900));
  566. }
  567. }
  568. std::cout<<"threadpicupload cam pos: "<<nCamPos<<" exit."<<std::endl;
  569. }
  570. qint64 grpcclient::CalcLateny(std::vector<qint64> &xvectorlatency)
  571. {
  572. if(xvectorlatency.size() == 0)return 1000;
  573. unsigned int i;
  574. qint64 nLatencyTotal = 0;
  575. for(i=0;i<xvectorlatency.size();i++)
  576. {
  577. nLatencyTotal = nLatencyTotal + xvectorlatency[i];
  578. }
  579. qint64 nLatencyAvg = nLatencyTotal/xvectorlatency.size();
  580. return nLatencyAvg;
  581. }