#include "swapunit.h" #include swapunit::swapunit(std::string strnodeid,std::string strobjnodeid,::grpc::ServerReaderWriter* stream) { mstrnodeid = strnodeid; mstrobjnodeid = strobjnodeid; mpthread = new std::thread(&swapunit::threadsend,this,stream); } swapunit::~swapunit() { } int swapunit::sendmsg(std::string strobjid, std::shared_ptr pdata_ptr, int ndatasize,double fpingavg,double flatency_nodestore) { if(mbrun == false) { return 0; } if(strobjid != mstrnodeid) { return 0; } iv::swapmsg xmsg; xmsg.ndatasize = ndatasize; xmsg.nRecvTime = std::chrono::system_clock::now().time_since_epoch().count(); xmsg.pdata_ptr = pdata_ptr; xmsg.pingavg = fpingavg; xmsg.flatency_nodestore = flatency_nodestore; if(mvectorsendmsgbuf.size()>30000) { std::cout< *stream) { int i; int64_t xLastSend = std::chrono::system_clock::now().time_since_epoch().count(); while(mbrun) { std::vector xvectorsendmsgbuf; xvectorsendmsgbuf.clear(); mmutex.lock(); int nsize = mvectorsendmsgbuf.size(); for(i=0;i lk(mmutexcv); if(mcv.wait_for(lk, std::chrono::milliseconds(100)) == std::cv_status::timeout) { lk.unlock(); continue; } else { lk.unlock(); continue; } } iv::CloudSwapReplyStream xreply; xreply.set_nres(xvectorsendmsgbuf.size()); double fpinginobj; double flatencyinnodestore = 0.0; if(xvectorsendmsgbuf.size() == 0) { xreply.set_flatency_inserver(0.0); } else { int64_t nlatinserver = (xNow - xvectorsendmsgbuf[0].nRecvTime); double flatinserver = nlatinserver; flatinserver = flatinserver/1000000.0; //ms xreply.set_flatency_inserver(flatinserver); fpinginobj = xvectorsendmsgbuf[0].pingavg; flatencyinnodestore = xvectorsendmsgbuf[0].flatency_nodestore; } for(i=0;iWrite(xreply); xLastSend = xNow; } mbdel = true; } bool swapunit::CanDel() { return mbdel; } void swapunit::stopswap() { mbrun = false; mpthread->join(); } void swapunit::SetPingAvg(double fpingavg) { mfpingavg = fpingavg; }