123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389 |
- #include "grpcclient.h"
- grpcclient * ggrpcclient;
- void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname)
- {
- (void)index;
- (void)dt;
- ggrpcclient->UpdateData(strdata,nSize,strmemname);
- }
- grpcclient::grpcclient(std::string stryamlpath)
- {
- ggrpcclient = this;
- dec_yaml(stryamlpath.data());
- int i;
- for(i=0;i<(int)mvectormsgunit.size();i++)
- {
- mvectormsgunit[i].mpa = iv::modulecomm::RegisterRecv(mvectormsgunit[i].mstrmsgname,ListenData);
- }
- for(i=0;i<(int)mvectorctrlmsgunit.size();i++)
- {
- mvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterSend(mvectorctrlmsgunit[i].mstrmsgname,mvectorctrlmsgunit[i].mnBufferSize,
- mvectorctrlmsgunit[i].mnBufferCount);
- }
- }
- qint64 grpcclient::calclatency(qint64 nnewlatency)
- {
- mvectorlatency.push_back(nnewlatency);
- while(mvectorlatency.size()>5)mvectorlatency.erase(mvectorlatency.begin());
- qint64 nlatencytotal =0;
- int nsize = mvectorlatency.size();
- int i;
- for(i=0;i<nsize;i++)
- {
- nlatencytotal = nlatencytotal + mvectorlatency.at(i);
- }
- if(nsize > 0)
- {
- nlatencytotal = nlatencytotal/nsize;
- }
- mnlatency = nlatencytotal;
- return nlatencytotal;
- }
- void grpcclient::sendcloudmsg(iv::cloud::cloudmsg &xmsg,std::shared_ptr<::grpc::ClientReaderWriter<iv::CloudSwapRequestStream, iv::CloudSwapReplyStream> > writer)
- {
- static int64_t nmsgindex = 0;
- nmsgindex++;
- int nbytesize = xmsg.ByteSize();
- char * strbuf = new char[nbytesize];
- std::shared_ptr<char> pstrbuf;
- pstrbuf.reset(strbuf);
- if(xmsg.SerializeToArray(strbuf,nbytesize))
- {
- iv::CloudSwapRequestStream request;
- request.set_nmsgindex(nmsgindex);
- request.set_nmsgtime(std::chrono::system_clock::now().time_since_epoch().count());
- request.set_pingavg(0);
- request.set_pingdev(0);
- request.set_pingmax(0);
- request.set_pingmin(0);
- request.set_strnodeid(mstrnodeid);
- request.set_strobjnodeid(mstrobjnodeid);
- request.set_xdata(strbuf,nbytesize);
- request.set_nunitcount(xmsg.xclouddata_size());
- QTime xt;
- xt.start();
- ::grpc::WriteOptions wo;
- // wo.set_write_through();
- // wo.clear_buffer_hint();
- // writer->Write(request,(void * )2);
- // bool bsend = true;
- bool bsend = writer->Write(request,wo);
- if(bsend == false)std::cout<<"send msg. rtn is "<<bsend<<std::endl;
- }
- }
- void grpcclient::threadsend(std::shared_ptr<::grpc::ClientReaderWriter<iv::CloudSwapRequestStream, iv::CloudSwapReplyStream> > writer,std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime)
- {
- (void)nlastreftime;
- std::cout<<"threadsend start "<<std::endl;
- int ninterval = 1000;
- int nheartbeatinterval = 1000; //when 1000ms no data, send hartbeat interval, no message data.
- mninterval = ninterval;
- QTime xTime;
- xTime.start();
- int nlastsend = xTime.elapsed();
- while(*pbrun)
- {
- iv::cloud::cloudmsg xmsg;
- if(mcloudemsg.xclouddata_size() == 0)
- {
- if(abs(xTime.elapsed()-nlastsend)>= nheartbeatinterval)
- {
- sendcloudmsg(xmsg,writer); //send data.
- nlastsend = xTime.elapsed();
- }
- else
- {
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- }
- continue;
- }
- mmutexmsg.lock();
- xmsg.CopyFrom(mcloudemsg);
- mcloudemsg.clear_xclouddata();
- mmutexmsg.unlock();
- sendcloudmsg(xmsg,writer); //send data.
- nlastsend = xTime.elapsed();
- }
- std::cout<<"thread send end."<<std::endl;
- }
- void grpcclient::threadrecv(std::shared_ptr<::grpc::ClientReaderWriter<iv::CloudSwapRequestStream, iv::CloudSwapReplyStream> > writer, std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime)
- {
- (void)nlastreftime;
- std::cout<<"threadrecv start"<<std::endl;
- iv::CloudSwapReplyStream reply;
- while (writer->Read(&reply)) {
- std::cout<<" reply nres: "<<reply.xdata_size()<<std::endl;
- // nfail = 0;
- // std::cout << "接收到回复:" << reply.remsg()<<"--\n" << std::endl;
- if(reply.nres() > 0)
- {
- int i;
- for(i=0;i<reply.xdata_size();i++)
- {
- iv::cloud::cloudmsg xmsg;
- if(xmsg.ParseFromArray(reply.mutable_xdata(i)->data(),reply.mutable_xdata(i)->size()))
- {
- shareswapmsg(&xmsg);
- }
- }
- }
- // std::cout<<"read data from server."<<std::endl;
- }
- std::cout<<"threadrecv end."<<std::endl;
- *pbrun = false;
- }
- void grpcclient::threadRPC(std::shared_ptr<qint64> pnrpctime,std::shared_ptr<bool> pbRun)
- {
- std::cout<<"threadrpc start."<<std::endl;
- std::string target_str = gstrserverip+":";
- target_str = target_str + gstrserverport ;//std::to_string()
- auto cargs = grpc::ChannelArguments();
- cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
- cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
- std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
- target_str, grpc::InsecureChannelCredentials(),cargs);
- std::unique_ptr<iv::CloudSwapStream::Stub> stub_ = iv::CloudSwapStream::NewStub(channel);
- int nfail = 0;
- while(!QThread::isInterruptionRequested())
- {
- ClientContext context ;
- // std::shared_ptr<ClientContext> pcontext(new ClientContext);
- std::shared_ptr<::grpc::ClientReaderWriter<iv::CloudSwapRequestStream, iv::CloudSwapReplyStream> > writerRead(stub_->swap(&context));
- // std::shared_ptr<bool> pbRun(new bool);
- *pbRun = true;
- std::shared_ptr<qint64> pntime = pnrpctime ;
- *pntime = QDateTime::currentMSecsSinceEpoch();
- std::thread * pthread = new std::thread(&grpcclient::threadsend,this,writerRead,pbRun,pntime);
- (void )pthread;
- std::thread * precvthread = new std::thread(&grpcclient::threadrecv,this,writerRead,pbRun,pntime);
- (void)precvthread;
- pthread->join();
- precvthread->join();
- // std::cout<<"threadRPC end"<<std::endl;
- // *pbRun = false;
- // while(((QDateTime::currentMSecsSinceEpoch() - *pntime)<3000)&&(*pbRun))
- // {
- // std::this_thread::sleep_for(std::chrono::milliseconds(1000));
- // }
- // *pbRun = false;
- std::this_thread::sleep_for(std::chrono::milliseconds(1000));
- channel = grpc::CreateCustomChannel(
- target_str, grpc::InsecureChannelCredentials(),cargs);
- stub_ = iv::CloudSwapStream::NewStub(channel);
- nfail++;
- // if(nfail > 100)std::this_thread::sleep_for(std::chrono::milliseconds(3000));
- // else std::this_thread::sleep_for(std::chrono::milliseconds(100));
- std::cout<<"reconnnect to server. nfail is "<<nfail<<std::endl;
- }
- }
- void grpcclient::run()
- {
- std::shared_ptr<bool> pbRun(new bool);
- *pbRun = true;
- std::shared_ptr<qint64> pntime(new qint64);
- *pntime = QDateTime::currentMSecsSinceEpoch();
- std::thread * pthread = new std::thread(&grpcclient::threadRPC,this,pntime,pbRun);
- (void)pthread;
- return;
- }
- void grpcclient::dec_yaml(const char * stryamlpath)
- {
- YAML::Node config;
- try
- {
- config = YAML::LoadFile(stryamlpath);
- }
- catch(YAML::BadFile e)
- {
- qDebug("load error.");
- (void)e;
- return;
- }
- if(config["server"])
- {
- gstrserverip = config["server"].as<std::string>();
- }
- if(config["port"])
- {
- gstrserverport = config["port"].as<std::string>();
- }
- if(config["nodeid"])
- {
- mstrnodeid = config["nodeid"].as<std::string>();
- }
- if(config["objnodeid"])
- {
- mstrobjnodeid = config["objnodeid"].as<std::string>();
- }
- std::string strmsgname;
- if(config["upmessage"])
- {
- for(YAML::const_iterator it= config["upmessage"].begin(); it != config["upmessage"].end();++it)
- {
- std::string strtitle = it->first.as<std::string>();
- std::cout<<strtitle<<std::endl;
- if(config["upmessage"][strtitle]["msgname"]&&config["upmessage"][strtitle]["buffersize"]&&config["upmessage"][strtitle]["buffercount"])
- {
- iv::msgunit xmu;
- strmsgname = config["upmessage"][strtitle]["msgname"].as<std::string>();
- strncpy(xmu.mstrmsgname,strmsgname.data(),255);
- xmu.mnBufferSize = config["upmessage"][strtitle]["buffersize"].as<int>();
- xmu.mnBufferCount = config["upmessage"][strtitle]["buffercount"].as<int>();
- if(config["upmessage"][strtitle]["bimportant"])
- {
- std::string strimportant = config["upmessage"][strtitle]["bimportant"].as<std::string>();
- if(strimportant == "true")
- {
- xmu.mbImportant = true;
- }
- }
- if(config["upmessage"][strtitle]["keeptime"])
- {
- std::string strkeep = config["upmessage"][strtitle]["keeptime"].as<std::string>();
- xmu.mnkeeptime = atoi(strkeep.data());
- }
- mvectormsgunit.push_back(xmu);
- }
- }
- }
- else
- {
- }
- if(config["downmessage"])
- {
- std::string strnodename = "downmessage";
- for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it)
- {
- std::string strtitle = it->first.as<std::string>();
- std::cout<<strtitle<<std::endl;
- if(config[strnodename][strtitle]["msgname"]&&config[strnodename][strtitle]["buffersize"]&&config[strnodename][strtitle]["buffercount"])
- {
- iv::msgunit xmu;
- strmsgname = config[strnodename][strtitle]["msgname"].as<std::string>();
- strncpy(xmu.mstrmsgname,strmsgname.data(),255);
- xmu.mnBufferSize = config[strnodename][strtitle]["buffersize"].as<int>();
- xmu.mnBufferCount = config[strnodename][strtitle]["buffercount"].as<int>();
- mvectorctrlmsgunit.push_back(xmu);
- }
- }
- }
- else
- {
- }
- return;
- }
- void grpcclient::shareswapmsg(iv::cloud::cloudmsg * pxmsg)
- {
- int i;
- int nsize = pxmsg->xclouddata_size();
- for(i=0;i<nsize;i++)
- {
- int j;
- int nquerysize = mvectorctrlmsgunit.size();
- for(j=0;j<nquerysize;j++)
- {
- if(strncmp(pxmsg->xclouddata(i).msgname().data(), mvectorctrlmsgunit[j].mstrmsgname,255) == 0)
- {
- // qDebug("size is %d ",pxmsg->xclouddata(i).data().size());
- iv::modulecomm::ModuleSendMsg(mvectorctrlmsgunit[j].mpa,pxmsg->xclouddata(i).data().data(),pxmsg->xclouddata(i).data().size());
- break;
- }
- }
- }
- }
- void grpcclient::UpdateData(const char *strdata, const unsigned int nSize, const char *strmemname)
- {
- mmutexmsg.lock();
- iv::cloud::cloudunit * pcu = mcloudemsg.add_xclouddata();
- pcu->set_msgname(strmemname);
- pcu->set_data(strdata,nSize);
- if(mcloudemsg.xclouddata_size() == 1)
- {
- mcloudemsg.set_xtime(std::chrono::system_clock::now().time_since_epoch().count());
- }
- mmutexmsg.unlock();
- }
|