123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- #include "swapunit.h"
- #include <QDateTime>
- swapunit::swapunit(std::string strnodeid,std::string strobjnodeid,::grpc::ServerReaderWriter<iv::CloudSwapReplyStream, iv::CloudSwapRequestStream>* stream)
- {
- mstrnodeid = strnodeid;
- mstrobjnodeid = strobjnodeid;
- mpthread = new std::thread(&swapunit::threadsend,this,stream);
- }
- swapunit::~swapunit()
- {
- }
- int swapunit::sendmsg(std::string strobjid, std::shared_ptr<char> pdata_ptr, int ndatasize,double fpingavg,double flatency_nodestore)
- {
- if(mbrun == false)
- {
- return 0;
- }
- if(strobjid != mstrnodeid)
- {
- return 0;
- }
- iv::swapmsg xmsg;
- xmsg.ndatasize = ndatasize;
- xmsg.nRecvTime = std::chrono::system_clock::now().time_since_epoch().count();
- xmsg.pdata_ptr = pdata_ptr;
- xmsg.pingavg = fpingavg;
- xmsg.flatency_nodestore = flatency_nodestore;
- if(mvectorsendmsgbuf.size()>30000)
- {
- std::cout<<QDateTime::currentDateTime().toString("yyyy-MM-dd-hh-mm-ss-zzz:").toStdString()<<"may connection down, buffer now full."<<std::endl;
- mmutex.lock();
- mvectorsendmsgbuf.clear();
- mmutex.unlock();
- }
- mmutex.lock();
- mvectorsendmsgbuf.push_back(xmsg);
- mmutex.unlock();
- // std::cout<<" notify send. "<<std::endl;
- mcv.notify_all(); //notify
- return 0;
- }
- void swapunit::threadsend(::grpc::ServerReaderWriter<iv::CloudSwapReplyStream, iv::CloudSwapRequestStream> *stream)
- {
- int i;
- int64_t xLastSend = std::chrono::system_clock::now().time_since_epoch().count();
- while(mbrun)
- {
- std::vector<iv::swapmsg> xvectorsendmsgbuf;
- xvectorsendmsgbuf.clear();
- mmutex.lock();
- int nsize = mvectorsendmsgbuf.size();
- for(i=0;i<nsize;i++)
- {
- xvectorsendmsgbuf.push_back(mvectorsendmsgbuf[i]);
- }
- mvectorsendmsgbuf.clear();
- mmutex.unlock();
- int64_t xNow = std::chrono::system_clock::now().time_since_epoch().count();
- if((abs((xNow - xLastSend)/1000000) <1000 ) &&(xvectorsendmsgbuf.size() == 0))
- {
- std::unique_lock<std::mutex> lk(mmutexcv);
- if(mcv.wait_for(lk, std::chrono::milliseconds(100)) == std::cv_status::timeout)
- {
- lk.unlock();
- continue;
- }
- else
- {
- lk.unlock();
- continue;
- }
- }
- iv::CloudSwapReplyStream xreply;
- xreply.set_nres(xvectorsendmsgbuf.size());
- double fpinginobj;
- double flatencyinnodestore = 0.0;
- if(xvectorsendmsgbuf.size() == 0)
- {
- xreply.set_flatency_inserver(0.0);
- }
- else
- {
- int64_t nlatinserver = (xNow - xvectorsendmsgbuf[0].nRecvTime);
- double flatinserver = nlatinserver;
- flatinserver = flatinserver/1000000.0; //ms
- xreply.set_flatency_inserver(flatinserver);
- fpinginobj = xvectorsendmsgbuf[0].pingavg;
- flatencyinnodestore = xvectorsendmsgbuf[0].flatency_nodestore;
- }
- for(i=0;i<nsize;i++)
- {
- std::cout<<QDateTime::currentDateTime().toString("yyyy-MM-dd-hh-mm-ss-zzz:").toStdString()<<" add reply data. "<<std::endl;
- xreply.add_xdata(xvectorsendmsgbuf[i].pdata_ptr.get(),xvectorsendmsgbuf[i].ndatasize);
- }
- xreply.set_flatencyinstore_obj(flatencyinnodestore);
- xreply.set_flatency_innode(mfpingavg);
- xreply.set_flatency_inobjnode(fpinginobj);
- xreply.set_nmsgservertime(xNow);
- // xreply.set_nres(1);
- std::cout<<QDateTime::currentDateTime().toString("yyyy-MM-dd-hh-mm-ss-zzz:").toStdString()<<mstrnodeid<<" reply data size : "<<xreply.xdata_size()<<std::endl;
- stream->Write(xreply);
- xLastSend = xNow;
- }
- mbdel = true;
- }
- bool swapunit::CanDel()
- {
- return mbdel;
- }
- void swapunit::stopswap()
- {
- mbrun = false;
- mpthread->join();
- }
- void swapunit::SetPingAvg(double fpingavg)
- {
- mfpingavg = fpingavg;
- }
|