swapunit.cpp 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. #include "swapunit.h"
  2. #include <QDateTime>
  3. swapunit::swapunit(std::string strnodeid,std::string strobjnodeid,::grpc::ServerReaderWriter<iv::CloudSwapReplyStream, iv::CloudSwapRequestStream>* stream)
  4. {
  5. mstrnodeid = strnodeid;
  6. mstrobjnodeid = strobjnodeid;
  7. mpthread = new std::thread(&swapunit::threadsend,this,stream);
  8. }
  9. swapunit::~swapunit()
  10. {
  11. }
  12. int swapunit::sendmsg(std::string strobjid, std::shared_ptr<char> pdata_ptr, int ndatasize,double fpingavg,double flatency_nodestore)
  13. {
  14. if(mbrun == false)
  15. {
  16. return 0;
  17. }
  18. if(strobjid != mstrnodeid)
  19. {
  20. return 0;
  21. }
  22. iv::swapmsg xmsg;
  23. xmsg.ndatasize = ndatasize;
  24. xmsg.nRecvTime = std::chrono::system_clock::now().time_since_epoch().count();
  25. xmsg.pdata_ptr = pdata_ptr;
  26. xmsg.pingavg = fpingavg;
  27. xmsg.flatency_nodestore = flatency_nodestore;
  28. if(mvectorsendmsgbuf.size()>30000)
  29. {
  30. std::cout<<QDateTime::currentDateTime().toString("yyyy-MM-dd-hh-mm-ss-zzz:").toStdString()<<"may connection down, buffer now full."<<std::endl;
  31. mmutex.lock();
  32. mvectorsendmsgbuf.clear();
  33. mmutex.unlock();
  34. }
  35. mmutex.lock();
  36. mvectorsendmsgbuf.push_back(xmsg);
  37. mmutex.unlock();
  38. // std::cout<<" notify send. "<<std::endl;
  39. mcv.notify_all(); //notify
  40. return 0;
  41. }
  42. void swapunit::threadsend(::grpc::ServerReaderWriter<iv::CloudSwapReplyStream, iv::CloudSwapRequestStream> *stream)
  43. {
  44. int i;
  45. int64_t xLastSend = std::chrono::system_clock::now().time_since_epoch().count();
  46. while(mbrun)
  47. {
  48. std::vector<iv::swapmsg> xvectorsendmsgbuf;
  49. xvectorsendmsgbuf.clear();
  50. mmutex.lock();
  51. int nsize = mvectorsendmsgbuf.size();
  52. for(i=0;i<nsize;i++)
  53. {
  54. xvectorsendmsgbuf.push_back(mvectorsendmsgbuf[i]);
  55. }
  56. mvectorsendmsgbuf.clear();
  57. mmutex.unlock();
  58. int64_t xNow = std::chrono::system_clock::now().time_since_epoch().count();
  59. if((abs((xNow - xLastSend)/1000000) <1000 ) &&(xvectorsendmsgbuf.size() == 0))
  60. {
  61. std::unique_lock<std::mutex> lk(mmutexcv);
  62. if(mcv.wait_for(lk, std::chrono::milliseconds(100)) == std::cv_status::timeout)
  63. {
  64. lk.unlock();
  65. continue;
  66. }
  67. else
  68. {
  69. lk.unlock();
  70. continue;
  71. }
  72. }
  73. iv::CloudSwapReplyStream xreply;
  74. xreply.set_nres(xvectorsendmsgbuf.size());
  75. double fpinginobj;
  76. double flatencyinnodestore = 0.0;
  77. if(xvectorsendmsgbuf.size() == 0)
  78. {
  79. xreply.set_flatency_inserver(0.0);
  80. }
  81. else
  82. {
  83. int64_t nlatinserver = (xNow - xvectorsendmsgbuf[0].nRecvTime);
  84. double flatinserver = nlatinserver;
  85. flatinserver = flatinserver/1000000.0; //ms
  86. xreply.set_flatency_inserver(flatinserver);
  87. fpinginobj = xvectorsendmsgbuf[0].pingavg;
  88. flatencyinnodestore = xvectorsendmsgbuf[0].flatency_nodestore;
  89. }
  90. for(i=0;i<nsize;i++)
  91. {
  92. std::cout<<QDateTime::currentDateTime().toString("yyyy-MM-dd-hh-mm-ss-zzz:").toStdString()<<" add reply data. "<<std::endl;
  93. xreply.add_xdata(xvectorsendmsgbuf[i].pdata_ptr.get(),xvectorsendmsgbuf[i].ndatasize);
  94. }
  95. xreply.set_flatencyinstore_obj(flatencyinnodestore);
  96. xreply.set_flatency_innode(mfpingavg);
  97. xreply.set_flatency_inobjnode(fpinginobj);
  98. xreply.set_nmsgservertime(xNow);
  99. // xreply.set_nres(1);
  100. std::cout<<QDateTime::currentDateTime().toString("yyyy-MM-dd-hh-mm-ss-zzz:").toStdString()<<mstrnodeid<<" reply data size : "<<xreply.xdata_size()<<std::endl;
  101. stream->Write(xreply);
  102. xLastSend = xNow;
  103. }
  104. mbdel = true;
  105. }
  106. bool swapunit::CanDel()
  107. {
  108. return mbdel;
  109. }
  110. void swapunit::stopswap()
  111. {
  112. mbrun = false;
  113. mpthread->join();
  114. }
  115. void swapunit::SetPingAvg(double fpingavg)
  116. {
  117. mfpingavg = fpingavg;
  118. }