#include "grpcclient.h" grpcclient * ggrpcclient; void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname) { (void)index; (void)dt; ggrpcclient->UpdateData(strdata,nSize,strmemname); } grpcclient::grpcclient(std::string stryamlpath) { ggrpcclient = this; dec_yaml(stryamlpath.data()); int i; for(i=0;i<(int)mvectormsgunit.size();i++) { mvectormsgunit[i].mpa = iv::modulecomm::RegisterRecv(mvectormsgunit[i].mstrmsgname,ListenData); } for(i=0;i<(int)mvectorctrlmsgunit.size();i++) { mvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterSend(mvectorctrlmsgunit[i].mstrmsgname,mvectorctrlmsgunit[i].mnBufferSize, mvectorctrlmsgunit[i].mnBufferCount); } } qint64 grpcclient::calclatency(qint64 nnewlatency) { mvectorlatency.push_back(nnewlatency); while(mvectorlatency.size()>5)mvectorlatency.erase(mvectorlatency.begin()); qint64 nlatencytotal =0; int nsize = mvectorlatency.size(); int i; for(i=0;i 0) { nlatencytotal = nlatencytotal/nsize; } mnlatency = nlatencytotal; return nlatencytotal; } void grpcclient::sendcloudmsg(iv::cloud::cloudmsg &xmsg,std::shared_ptr<::grpc::ClientReaderWriter > writer) { static int64_t nmsgindex = 0; nmsgindex++; int nbytesize = xmsg.ByteSize(); char * strbuf = new char[nbytesize]; std::shared_ptr pstrbuf; pstrbuf.reset(strbuf); if(xmsg.SerializeToArray(strbuf,nbytesize)) { iv::CloudSwapRequestStream request; request.set_nmsgindex(nmsgindex); request.set_nmsgtime(std::chrono::system_clock::now().time_since_epoch().count()); request.set_pingavg(0); request.set_pingdev(0); request.set_pingmax(0); request.set_pingmin(0); request.set_strnodeid(mstrnodeid); request.set_strobjnodeid(mstrobjnodeid); request.set_xdata(strbuf,nbytesize); request.set_nunitcount(xmsg.xclouddata_size()); QTime xt; xt.start(); ::grpc::WriteOptions wo; // wo.set_write_through(); // wo.clear_buffer_hint(); // writer->Write(request,(void * )2); // bool bsend = true; bool bsend = writer->Write(request,wo); if(bsend == false)std::cout<<"send msg. rtn is "< > writer,std::shared_ptr pbrun,std::shared_ptr nlastreftime) { (void)nlastreftime; std::cout<<"threadsend start "<= nheartbeatinterval) { sendcloudmsg(xmsg,writer); //send data. nlastsend = xTime.elapsed(); } else { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } continue; } mmutexmsg.lock(); xmsg.CopyFrom(mcloudemsg); mcloudemsg.clear_xclouddata(); mmutexmsg.unlock(); sendcloudmsg(xmsg,writer); //send data. nlastsend = xTime.elapsed(); } std::cout<<"thread send end."< > writer, std::shared_ptr pbrun,std::shared_ptr nlastreftime) { (void)nlastreftime; std::cout<<"threadrecv start"<Read(&reply)) { std::cout<<" reply nres: "< 0) { int i; for(i=0;idata(),reply.mutable_xdata(i)->size())) { shareswapmsg(&xmsg); } } } // std::cout<<"read data from server."< pnrpctime,std::shared_ptr pbRun) { std::cout<<"threadrpc start."< channel = grpc::CreateCustomChannel( target_str, grpc::InsecureChannelCredentials(),cargs); std::unique_ptr stub_ = iv::CloudSwapStream::NewStub(channel); int nfail = 0; while(!QThread::isInterruptionRequested()) { ClientContext context ; // std::shared_ptr pcontext(new ClientContext); std::shared_ptr<::grpc::ClientReaderWriter > writerRead(stub_->swap(&context)); // std::shared_ptr pbRun(new bool); *pbRun = true; std::shared_ptr pntime = pnrpctime ; *pntime = QDateTime::currentMSecsSinceEpoch(); std::thread * pthread = new std::thread(&grpcclient::threadsend,this,writerRead,pbRun,pntime); (void )pthread; std::thread * precvthread = new std::thread(&grpcclient::threadrecv,this,writerRead,pbRun,pntime); (void)precvthread; pthread->join(); precvthread->join(); // std::cout<<"threadRPC end"< 100)std::this_thread::sleep_for(std::chrono::milliseconds(3000)); // else std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::cout<<"reconnnect to server. nfail is "< pbRun(new bool); *pbRun = true; std::shared_ptr pntime(new qint64); *pntime = QDateTime::currentMSecsSinceEpoch(); std::thread * pthread = new std::thread(&grpcclient::threadRPC,this,pntime,pbRun); (void)pthread; return; } void grpcclient::dec_yaml(const char * stryamlpath) { YAML::Node config; try { config = YAML::LoadFile(stryamlpath); } catch(YAML::BadFile e) { qDebug("load error."); (void)e; return; } if(config["server"]) { gstrserverip = config["server"].as(); } if(config["port"]) { gstrserverport = config["port"].as(); } if(config["nodeid"]) { mstrnodeid = config["nodeid"].as(); } if(config["objnodeid"]) { mstrobjnodeid = config["objnodeid"].as(); } std::string strmsgname; if(config["upmessage"]) { for(YAML::const_iterator it= config["upmessage"].begin(); it != config["upmessage"].end();++it) { std::string strtitle = it->first.as(); std::cout<(); strncpy(xmu.mstrmsgname,strmsgname.data(),255); xmu.mnBufferSize = config["upmessage"][strtitle]["buffersize"].as(); xmu.mnBufferCount = config["upmessage"][strtitle]["buffercount"].as(); if(config["upmessage"][strtitle]["bimportant"]) { std::string strimportant = config["upmessage"][strtitle]["bimportant"].as(); if(strimportant == "true") { xmu.mbImportant = true; } } if(config["upmessage"][strtitle]["keeptime"]) { std::string strkeep = config["upmessage"][strtitle]["keeptime"].as(); xmu.mnkeeptime = atoi(strkeep.data()); } mvectormsgunit.push_back(xmu); } } } else { } if(config["downmessage"]) { std::string strnodename = "downmessage"; for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it) { std::string strtitle = it->first.as(); std::cout<(); strncpy(xmu.mstrmsgname,strmsgname.data(),255); xmu.mnBufferSize = config[strnodename][strtitle]["buffersize"].as(); xmu.mnBufferCount = config[strnodename][strtitle]["buffercount"].as(); mvectorctrlmsgunit.push_back(xmu); } } } else { } return; } void grpcclient::shareswapmsg(iv::cloud::cloudmsg * pxmsg) { int i; int nsize = pxmsg->xclouddata_size(); for(i=0;ixclouddata(i).msgname().data(), mvectorctrlmsgunit[j].mstrmsgname,255) == 0) { // qDebug("size is %d ",pxmsg->xclouddata(i).data().size()); iv::modulecomm::ModuleSendMsg(mvectorctrlmsgunit[j].mpa,pxmsg->xclouddata(i).data().data(),pxmsg->xclouddata(i).data().size()); break; } } } } void grpcclient::UpdateData(const char *strdata, const unsigned int nSize, const char *strmemname) { mmutexmsg.lock(); iv::cloud::cloudunit * pcu = mcloudemsg.add_xclouddata(); pcu->set_msgname(strmemname); pcu->set_data(strdata,nSize); if(mcloudemsg.xclouddata_size() == 1) { mcloudemsg.set_xtime(std::chrono::system_clock::now().time_since_epoch().count()); } mmutexmsg.unlock(); }