#include "grpcclient.h" grpcclient * ggrpcclient; void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname) { ggrpcclient->UpdateData(strdata,nSize,strmemname); } //#define TESTUP void ListenPicData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname) { #ifndef TESTUP ggrpcclient->UpdatePicData(strdata,nSize,strmemname); #endif #ifdef TESTUP ggrpcclient->UpdatePicData(strdata,nSize,"h264front"); ggrpcclient->UpdatePicData(strdata,nSize,"h264rear"); ggrpcclient->UpdatePicData(strdata,nSize,"h264left"); ggrpcclient->UpdatePicData(strdata,nSize,"h264right"); #endif } grpcclient::grpcclient(std::string stryamlpath) { ggrpcclient = this; dec_yaml(stryamlpath.data()); mstrpicmsgname[0] = "h264front"; mstrpicmsgname[1] = "h264rear"; mstrpicmsgname[2] = "h264left"; mstrpicmsgname[3] = "h264right"; unsigned int i; if(mbFrameUpdate) { for(i=0;iisFinished() == false) { } std::cout<<"now join grpcclient thread"<join(); } } for(i=0;i channel = grpc::CreateCustomChannel( target_str, grpc::InsecureChannelCredentials(),cargs); std::unique_ptr stub_ = iv::UploadThread::NewStub(channel); iv::UploadRequestThread request; int nid = 0; // Container for the data we expect from the server. iv::UploadReplyThread reply; gpr_timespec timespec; timespec.tv_sec = 30;//设置阻塞时间为2秒 timespec.tv_nsec = 0; timespec.clock_type = GPR_TIMESPAN; // ClientContext context; std::vector xvectorlatency; while(!QThread::isInterruptionRequested()) { std::this_thread::sleep_for(std::chrono::milliseconds(1)); if((xTime.elapsed()-nlastsend) nkeeptime) { nkeeptime = mvectormsgunit[i].mnkeeptime; } iv::cloud::cloudunit xcloudunit; xcloudunit.set_msgname(mvectormsgunit[i].mstrmsgname); xcloudunit.set_data(mvectormsgunit[i].mpstrmsgdata.get(),mvectormsgunit[i].mndatasize); iv::cloud::cloudunit * pcu = xmsg.add_xclouddata(); pcu->CopyFrom(xcloudunit); } } gMutexMsg.unlock(); int nbytesize = xmsg.ByteSize(); char * strbuf = new char[nbytesize]; std::shared_ptr pstrbuf; pstrbuf.reset(strbuf); if(xmsg.SerializeToArray(strbuf,nbytesize)) { ClientContext context ; context.set_deadline(timespec); qint64 time1 = QDateTime::currentMSecsSinceEpoch(); request.set_id(nid); request.set_ntime(time1); request.set_strquerymd5(gstrqueryMD5); request.set_strctrlmd5(gstrctrlMD5); request.set_strvin(gstrVIN); request.set_xdata(strbuf,nbytesize); request.set_kepptime(nkeeptime); request.set_bimportant(bImportant); request.set_nsendtime(QDateTime::currentMSecsSinceEpoch()); request.set_nlatency(CalcLateny(xvectorlatency)); nid++; nlastsend = xTime.elapsed(); // The actual RPC. Status status = stub_->uploaddata(&context, request, &reply); if (status.ok()) { // std::cout<<" data size is "<10)xvectorlatency.erase(xvectorlatency.begin()); if(reply.nres() == 1) { iv::cloud::cloudmsg xmsg; if(xmsg.ParseFromArray(reply.xdata().data(),reply.xdata().size())) { sharectrlmsg(&xmsg); } } } else { std::cout << status.error_code() << ": " << status.error_message() << std::endl; std::cout<<"RPC failed"< vecmodulename; if(config["server"]) { gstrserverip = config["server"].as(); } std::cout<<" server ip: "<(); } if(config["uploadinterval"]) { gstruploadinterval = config["uploadinterval"].as(); } if(config["skip"]) { std::string strskip = config["skip"].as(); mnskip = atoi(strskip.data()); if(mnskip<1)mnskip = 1; } if(config["VIN"]) { gstrVIN = config["VIN"].as(); } if(config["queryMD5"]) { gstrqueryMD5 = config["queryMD5"].as(); } else { return; } if(config["ctrlMD5"]) { gstrctrlMD5 = config["ctrlMD5"].as(); } std::string strmsgname; if(config["uploadmessage"]) { for(YAML::const_iterator it= config["uploadmessage"].begin(); it != config["uploadmessage"].end();++it) { std::string strtitle = it->first.as(); std::cout<(); strncpy(xmu.mstrmsgname,strmsgname.data(),255); xmu.mnBufferSize = config["uploadmessage"][strtitle]["buffersize"].as(); xmu.mnBufferCount = config["uploadmessage"][strtitle]["buffercount"].as(); if(config["uploadmessage"][strtitle]["bimportant"]) { std::string strimportant = config["uploadmessage"][strtitle]["bimportant"].as(); if(strimportant == "true") { xmu.mbImportant = true; } } if(config["uploadmessage"][strtitle]["keeptime"]) { std::string strkeep = config["uploadmessage"][strtitle]["keeptime"].as(); xmu.mnkeeptime = atoi(strkeep.data()); } mvectormsgunit.push_back(xmu); } } } else { } if(!config["ctrlMD5"]) { return; } if(config["ctrlmessage"]) { std::string strnodename = "ctrlmessage"; for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it) { std::string strtitle = it->first.as(); std::cout<(); strncpy(xmu.mstrmsgname,strmsgname.data(),255); xmu.mnBufferSize = config[strnodename][strtitle]["buffersize"].as(); xmu.mnBufferCount = config[strnodename][strtitle]["buffercount"].as(); mvectorctrlmsgunit.push_back(xmu); } } } else { } return; } void grpcclient::sharectrlmsg(iv::cloud::cloudmsg * pxmsg) { int i; int nsize = pxmsg->xclouddata_size(); for(i=0;ixclouddata(i).msgname().data(), mvectorctrlmsgunit[j].mstrmsgname,255) == 0) { // qDebug("size is %d ",pxmsg->xclouddata(i).data().size()); iv::modulecomm::ModuleSendMsg(mvectorctrlmsgunit[j].mpa,pxmsg->xclouddata(i).data().data(),pxmsg->xclouddata(i).data().size()); break; } } } } void grpcclient::UpdateData(const char *strdata, const unsigned int nSize, const char *strmemname) { int nsize = mvectormsgunit.size(); int i; for(i=0;i= NUM_CAM) { std::cout<<"Camera count is "<(new char[nSize]); xframe.mDataSize = nSize; memcpy(xframe.mpstrframedata.get(),strdata,nSize); iv::threadpicunit * ppicbuf = &mpicbuf[npos]; ppicbuf->mMutex.lock(); ppicbuf->mnMsgTime = QDateTime::currentMSecsSinceEpoch(); ppicbuf->mbRefresh = true; if(ppicbuf->mvectorframe.size()>=NUM_FRAMEBUFFSIZE) { while((ppicbuf->mvectorframe.size()>=NUM_FRAMEBUFFSIZE)||((ppicbuf->mvectorframe.size()>0)&&(ppicbuf->mvectorframe.size()mvectorframe[0].mbIframe == false))) { ppicbuf->mvectorframe.erase(ppicbuf->mvectorframe.begin()+0); } if(ppicbuf->mvectorframe.size() == 0) { std::cout<<" Reset SPS Iframe Mark."<mbRecvIFrame = false; } } ppicbuf->mvectorframe.push_back(xframe); // mpicbuf[npos].mpstrmsgdata = std::shared_ptr(new char[nSize]); // mpicbuf[npos].mDataSize = nSize; // memcpy(mpicbuf[npos].mpstrmsgdata.get(),strdata,nSize); ppicbuf->mMutex.unlock(); ppicbuf->mwc.wakeAll(); } void grpcclient::threadpicupload(int nCamPos) { std::cout<<"thread cam "< channel = grpc::CreateCustomChannel( target_str, grpc::InsecureChannelCredentials(),cargs); std::unique_ptr stub_ = iv::UploadThread::NewStub(channel); iv::PicUpRequestThread request; int nid = 0; // Container for the data we expect from the server. iv::PicUpReplyThread reply; gpr_timespec timespec; timespec.tv_sec = 30;//设置阻塞时间为2秒 timespec.tv_nsec = 0; timespec.clock_type = GPR_TIMESPAN; // ClientContext context; while(mbPicUpload) { std::shared_ptr pstr_ptr; if((nCamPos<0)||(nCamPos >= NUM_CAM)) { std::cout<<"Cam Pos Error. "<<"Pos: "< 0)bUpdate = true; if(bUpdate == true) { nMsgTime = mpicbuf[nCamPos].mnMsgTime; mpicbuf[nCamPos].mbRefresh = false; const int npacmax = 10; int npaccount = mpicbuf[nCamPos].mvectorframe.size(); if(npaccount > npacmax)npaccount = npacmax; int nalldatasize = 0; int j; for(j=0;j(new char[nsendpacsize]); char * pstrvalue = (char * )pstr_ptr.get(); int npos = 0; for(j=0;j 500) // { // if(mpicbuf[nCamPos].mnSkipBase<30)mpicbuf[nCamPos].mnSkipBase++; // } // else // { // if(npiclatency<300) // if(mpicbuf[nCamPos].mnSkipBase > mpicbuf[nCamPos].mnDefSkipBase)mpicbuf[nCamPos].mnSkipBase--; // } std::cout<<"upload "<uploadpic(&context, request, &reply); if (status.ok()) { qint64 nlaten = QDateTime::currentMSecsSinceEpoch() - time1; mpicbuf[nCamPos].mMutex.lock(); mpicbuf[nCamPos].mvectorlatency.push_back(nlaten); while(mpicbuf[nCamPos].mvectorlatency.size()>10)mpicbuf[nCamPos].mvectorlatency.erase(mpicbuf[nCamPos].mvectorlatency.begin()); mpicbuf[nCamPos].mMutex.unlock(); if(reply.nres() == 1) { // iv::cloud::cloudmsg xmsg; // if(xmsg.ParseFromArray(reply.xdata().data(),reply.xdata().size())) // { // sharectrlmsg(&xmsg); // } } } else { std::cout << status.error_code() << ": " << status.error_message() << std::endl; std::cout<<"RPC failed"< &xvectorlatency) { if(xvectorlatency.size() == 0)return 1000; unsigned int i; qint64 nLatencyTotal = 0; for(i=0;i