Przeglądaj źródła

change driver_map_xodrload.

yuchuli 3 lat temu
rodzic
commit
e9a7486878

+ 84 - 137
src/driver/driver_cloud_swap_client/grpcclient.cpp

@@ -47,124 +47,87 @@ qint64 grpcclient::calclatency(qint64 nnewlatency)
     return nlatencytotal;
 }
 
-void grpcclient::threadsend(std::shared_ptr<::grpc::ClientReaderWriter<iv::UploadRequestStream, iv::UploadReplyStream> > writer,std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime)
+void grpcclient::sendcloudmsg(iv::cloud::cloudmsg &xmsg,std::shared_ptr<::grpc::ClientReaderWriter<iv::CloudSwapRequestStream, iv::CloudSwapReplyStream> > writer)
+{
+    static int64_t nmsgindex = 0;
+    nmsgindex++;
+    int nbytesize = xmsg.ByteSize();
+    char * strbuf = new char[nbytesize];
+    std::shared_ptr<char> pstrbuf;
+    pstrbuf.reset(strbuf);
+    if(xmsg.SerializeToArray(strbuf,nbytesize))
+    {
+        iv::CloudSwapRequestStream request;
+
+
+        request.set_nmsgindex(nmsgindex);
+        request.set_nmsgtime(std::chrono::system_clock::now().time_since_epoch().count());
+        request.set_pingavg(0);
+        request.set_pingdev(0);
+        request.set_pingmax(0);
+        request.set_pingmin(0);
+        request.set_strnodeid("111");
+        request.set_strobjnodeid("222");
+        request.set_xdata(strbuf,nbytesize);
+
+
+        QTime xt;
+        xt.start();
+        ::grpc::WriteOptions wo;
+//            wo.set_write_through();
+//            wo.clear_buffer_hint();
+//        writer->Write(request,(void * )2);
+
+//        bool bsend = true;
+        bool bsend = writer->Write(request,wo);
+
+        if(bsend == false)std::cout<<"send msg. rtn is "<<bsend<<std::endl;
+
+
+    }
+
+
+}
+
+void grpcclient::threadsend(std::shared_ptr<::grpc::ClientReaderWriter<iv::CloudSwapRequestStream, iv::CloudSwapReplyStream> > writer,std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime)
 {
     std::cout<<"threadsend start "<<std::endl;
-    int nsize = mvectormsgunit.size();
-    int i;
 
-    int ninterval = atoi(gstruploadinterval.data());
-    if(ninterval<=0)ninterval = 100;
+    int ninterval = 1000;
+    int nheartbeatinterval = 1000; //when 1000ms no data, send hartbeat interval, no message data.
+
 
     mninterval = ninterval;
-    float ffraterate = 1.0f/((float)mninterval);
-    int nrawinterval = ninterval;
-    int nok = 0;
 
     QTime xTime;
     xTime.start();
     int nlastsend = xTime.elapsed();
 
-    int nid= 0;
 
     while(*pbrun)
     {
-        std::this_thread::sleep_for(std::chrono::milliseconds(1));
-        if((xTime.elapsed()-nlastsend)<mninterval)
-        {
-            continue;
-        }
-
-        bool bImportant = false;
-        int nkeeptime = 0;
         iv::cloud::cloudmsg xmsg;
-        xmsg.set_xtime(QDateTime::currentMSecsSinceEpoch());
-        gMutexMsg.lock();
-        for(i=0;i<nsize;i++)
+        if(mcloudemsg.xclouddata_size()  == 0)
         {
-            if(mvectormsgunit[i].mbRefresh)
+            if(abs(xTime.elapsed()-nlastsend)>= nheartbeatinterval)
             {
-                mvectormsgunit[i].mbRefresh = false;
-                if(mvectormsgunit[i].mbImportant)
-                {
-                    bImportant = true;
-                }
-                if(mvectormsgunit[i].mnkeeptime > nkeeptime)
-                {
-                    nkeeptime = mvectormsgunit[i].mnkeeptime;
-                }
-                iv::cloud::cloudunit xcloudunit;
-                xcloudunit.set_msgname(mvectormsgunit[i].mstrmsgname);
-                xcloudunit.set_data(mvectormsgunit[i].mpstrmsgdata.get(),mvectormsgunit[i].mndatasize);
-                iv::cloud::cloudunit * pcu = xmsg.add_xclouddata();
-                pcu->CopyFrom(xcloudunit);
+                sendcloudmsg(xmsg,writer); //send data.
+                nlastsend = xTime.elapsed();
             }
-
+            else
+            {
+                std::this_thread::sleep_for(std::chrono::milliseconds(1));
+            }
+            continue;
         }
-        gMutexMsg.unlock();
-
-        int nbytesize = xmsg.ByteSize();
-        char * strbuf = new char[nbytesize];
-        std::shared_ptr<char> pstrbuf;
-        pstrbuf.reset(strbuf);
-        if(xmsg.SerializeToArray(strbuf,nbytesize))
-        {
-            iv::UploadRequestStream request;
-            qint64 time1 = QDateTime::currentMSecsSinceEpoch();
-
-            request.set_id(nid);
-            request.set_ntime(time1);
-            request.set_strquerymd5(gstrqueryMD5);
-            request.set_strctrlmd5(gstrctrlMD5);
-            request.set_strvin(gstrVIN);
-            request.set_xdata(strbuf,nbytesize);
-            request.set_kepptime(nkeeptime);
-            request.set_bimportant(bImportant);
-            request.set_nlatency(mnlatency);
-            ffraterate = 1000.0f/((float)mninterval);
-            request.set_fframerate(ffraterate);
-            request.set_nsendtime(time1);
-            nid++;
-
-            nlastsend = xTime.elapsed();
-
-            QTime xt;
-            xt.start();
-            ::grpc::WriteOptions wo;
-//            wo.set_write_through();
-//            wo.clear_buffer_hint();
-    //        writer->Write(request,(void * )2);
-
-    //        bool bsend = true;
-            bool bsend = writer->Write(request,wo);
-
-            *nlastreftime = QDateTime::currentMSecsSinceEpoch();
-
-//            if(xt.elapsed()>10)
-//            {
-//                nok = 0;
-//                if(ninterval < 1000)ninterval = ninterval * 11/10;
-//                mninterval = ninterval;
-//                ffraterate = 1.0f/((float)mninterval);
-//                qDebug("send ela is %d ninterval is %d",xt.elapsed(),ninterval);
-//            }
-//            else
-//            {
-//                nok++;
-//                if((ninterval > nrawinterval)&&(nok>10))
-//                {
-//                    nok = 0;
-
-//                    ninterval = ninterval*10/11;
-//                    mninterval = ninterval;
-//                    ffraterate = 1.0f/((float)mninterval);
-//                    std::cout<<"ninterval is "<<ninterval<<std::endl;
-//                }
-//            }
-            if(bsend == false)std::cout<<"send msg. rtn is "<<bsend<<std::endl;
 
+        mmutexmsg.lock();
+        xmsg.CopyFrom(mcloudemsg);
+        mcloudemsg.clear_xclouddata();
+        mmutexmsg.unlock();
 
-        }
+        sendcloudmsg(xmsg,writer); //send data.
+        nlastsend = xTime.elapsed();
 
 
     }
@@ -173,35 +136,24 @@ void grpcclient::threadsend(std::shared_ptr<::grpc::ClientReaderWriter<iv::Uploa
 
 
 
-void grpcclient::threadrecv(std::shared_ptr<::grpc::ClientReaderWriter<iv::UploadRequestStream, iv::UploadReplyStream> > writer, std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime)
+void grpcclient::threadrecv(std::shared_ptr<::grpc::ClientReaderWriter<iv::CloudSwapRequestStream, iv::CloudSwapReplyStream> > writer, std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime)
 {
     std::cout<<"threadrecv start"<<std::endl;
-    iv::UploadReplyStream reply;
+    iv::CloudSwapReplyStream reply;
     while (writer->Read(&reply)) {
 
-        *nlastreftime = QDateTime::currentMSecsSinceEpoch();
-        qint64 nlaten = QDateTime::currentMSecsSinceEpoch() - reply.nreqsendtime();
-        if(reply.nreqsendtime() == 0)nlaten = 0;
-        else
-        {
-            nlaten = nlaten - reply.npausetime();
-        }
-        calclatency(nlaten);
-
-        if(reply.framerate() >0.001)
-        {
-            mninterval = 1000.0/reply.framerate();
-        }
-
-        qDebug("latency is %ld",nlaten);
 //        nfail = 0;
 //        std::cout << "接收到回复:" << reply.remsg()<<"--\n" << std::endl;
-        if(reply.nres() == 1)
+        if(reply.nres() > 0)
         {
-            iv::cloud::cloudmsg xmsg;
-            if(xmsg.ParseFromArray(reply.xdata().data(),reply.xdata().size()))
+            int i;
+            for(i=0;i<reply.xdata_size();i++)
             {
-                sharectrlmsg(&xmsg);
+                iv::cloud::cloudmsg xmsg;
+                if(xmsg.ParseFromArray(reply.mutable_xdata(i)->data(),reply.mutable_xdata(i)->size()))
+                {
+                    shareswapmsg(&xmsg);
+                }
             }
         }
 
@@ -224,7 +176,7 @@ void grpcclient::threadRPC(std::shared_ptr<qint64> pnrpctime,std::shared_ptr<boo
     std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
              target_str, grpc::InsecureChannelCredentials(),cargs);
 
-    std::unique_ptr<iv::UploadStream::Stub> stub_ = iv::UploadStream::NewStub(channel);
+    std::unique_ptr<iv::CloudSwapStream::Stub> stub_ = iv::CloudSwapStream::NewStub(channel);
 
 
     int nfail = 0;
@@ -233,7 +185,7 @@ void grpcclient::threadRPC(std::shared_ptr<qint64> pnrpctime,std::shared_ptr<boo
     {
         ClientContext context ;
 //        std::shared_ptr<ClientContext> pcontext(new ClientContext);
-        std::shared_ptr<::grpc::ClientReaderWriter<iv::UploadRequestStream, iv::UploadReplyStream> > writerRead(stub_->upload(&context));
+        std::shared_ptr<::grpc::ClientReaderWriter<iv::CloudSwapRequestStream, iv::CloudSwapReplyStream> > writerRead(stub_->swap(&context));
 
 
 //        std::shared_ptr<bool> pbRun(new bool);
@@ -264,7 +216,7 @@ void grpcclient::threadRPC(std::shared_ptr<qint64> pnrpctime,std::shared_ptr<boo
 
         channel = grpc::CreateCustomChannel(
                  target_str, grpc::InsecureChannelCredentials(),cargs);
-        stub_ = iv::UploadStream::NewStub(channel);
+        stub_ = iv::CloudSwapStream::NewStub(channel);
         nfail++;
 //        if(nfail > 100)std::this_thread::sleep_for(std::chrono::milliseconds(3000));
 //        else std::this_thread::sleep_for(std::chrono::milliseconds(100));
@@ -425,7 +377,7 @@ void grpcclient::dec_yaml(const char * stryamlpath)
 
 }
 
-void grpcclient::sharectrlmsg(iv::cloud::cloudmsg * pxmsg)
+void grpcclient::shareswapmsg(iv::cloud::cloudmsg * pxmsg)
 {
     int i;
     int nsize = pxmsg->xclouddata_size();
@@ -447,20 +399,15 @@ void grpcclient::sharectrlmsg(iv::cloud::cloudmsg * pxmsg)
 
 void grpcclient::UpdateData(const char *strdata, const unsigned int nSize, const char *strmemname)
 {
-    int nsize = mvectormsgunit.size();
-    int i;
-    for(i=0;i<nsize;i++)
+
+    mmutexmsg.lock();
+    iv::cloud::cloudunit * pcu = mcloudemsg.add_xclouddata();
+    pcu->set_msgname(strmemname);
+    pcu->set_data(strdata,nSize);
+    if(mcloudemsg.xclouddata_size() == 1)
     {
-        if(strncmp(strmemname,mvectormsgunit[i].mstrmsgname,255) == 0)
-        {
-            gMutexMsg.lock();
-            char * strtem = new char[nSize];
-            memcpy(strtem,strdata,nSize);
-            mvectormsgunit[i].mpstrmsgdata.reset(strtem);
-            mvectormsgunit[i].mndatasize = nSize;
-            mvectormsgunit[i].mbRefresh = true;
-            gMutexMsg.unlock();
-            break;
-        }
+        mcloudemsg.set_xtime(std::chrono::system_clock::now().time_since_epoch().count());
     }
+    mmutexmsg.unlock();
+
 }

+ 8 - 3
src/driver/driver_cloud_swap_client/grpcclient.h

@@ -80,17 +80,22 @@ private:
     int mninterval = 100;
     int mnlatency = 0;
     std::vector<qint64> mvectorlatency;
+
+private:
+    iv::cloud::cloudmsg mcloudemsg;
+    std::mutex mmutexmsg;
+    void sendcloudmsg(iv::cloud::cloudmsg & xmsg,std::shared_ptr<::grpc::ClientReaderWriter<iv::CloudSwapRequestStream, iv::CloudSwapReplyStream> > writer);
 public:
     void UpdateData(const char * strdata,const unsigned int nSize,const char * strmemname);
 private:
     void run();
     void dec_yaml(const char * stryamlpath);
-    void sharectrlmsg(iv::cloud::cloudmsg * pxmsg);
+    void shareswapmsg(iv::cloud::cloudmsg * pxmsg);
 
     void threadRPC(std::shared_ptr<qint64> pnrpctime,std::shared_ptr<bool> pbRun);
 
-    void threadsend(std::shared_ptr<::grpc::ClientReaderWriter<iv::UploadRequestStream, iv::UploadReplyStream> > writer,std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime);
-    void threadrecv(std::shared_ptr<::grpc::ClientReaderWriter<iv::UploadRequestStream, iv::UploadReplyStream> > writer,std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime);
+    void threadsend(std::shared_ptr<::grpc::ClientReaderWriter<iv::CloudSwapRequestStream, iv::CloudSwapReplyStream> > writer,std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime);
+    void threadrecv(std::shared_ptr<::grpc::ClientReaderWriter<iv::CloudSwapRequestStream, iv::CloudSwapReplyStream> > writer,std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime);
     qint64 calclatency(qint64 nnewlatency);
 };
 

+ 1 - 0
src/driver/driver_map_xodrload/driver_map_xodrload.pro

@@ -87,5 +87,6 @@ HEADERS += \
 INCLUDEPATH += $$PWD/../../common/common/xodr
 INCLUDEPATH += $$PWD/../../common/common/xodr/xodrfunc
 
+DEFINES += INPILOT
 
 

+ 6 - 2
src/include/proto3/cloudswap.proto

@@ -39,15 +39,19 @@ message CloudSwapRequestStream {
     double pingmin = 7;
     double pingmax = 8;
     double pingdev = 9;
+    double fLatencyInStore = 10; //message store in node before send.
+    int32  nunitcount = 11;  //cloud unit count in xdata.
 }
 
 // The response message containing the greetings
 message CloudSwapReplyStream {
   int32 nres = 1;  //0 no message 1 have message
   repeated bytes xdata = 2;
-  double fLatency = 3; //Latecy = node_ping_avg + objnode_ping_avg + data_store_in_server
+  double fLatency_InNode = 3; //Latecy = node_ping_avg + objnode_ping_avg + data_store_in_server
   double fLatency_InServer = 4;
-  uint64 nmsgservertime = 5;  //server send this message when
+  double fLatency_InObjNode = 5;
+  double fLatencyInStore_Obj = 6;  //first message store latency in object node.
+  uint64 nmsgservertime = 7;  //server send this message when
 }