浏览代码

change driver_cloud_swap_client.

yuchuli 3 年之前
父节点
当前提交
40e4a05a79

+ 6 - 7
src/driver/driver_cloud_swap_client/driver_cloud_swap_client.yaml

@@ -1,13 +1,12 @@
-server : 47.96.250.93
-port : 50051
+server : 127.0.0.1
+port : 50061
 uploadinterval : 100
 
-VIN : AAAAAAAAAAAAAAAAA
-queryMD5 : 5d41402abc4b2a76b9719d911017c592
-ctrlMD5  : 5d41402abc4b2a76b9719d911017c592
+nodeid : ge3_1
+objnodeid : vtd
 
 
-uploadmessage:
+upmessage:
   usbpic:
     msgname: compresspic
     buffersize: 10000000
@@ -29,7 +28,7 @@ uploadmessage:
     bimportant: true
     keeptime: 3000
 
-ctrlmessage:
+downmessage:
   xodrsrc:
     msgname: xodrsrc
     buffersize: 1000

+ 28 - 50
src/driver/driver_cloud_swap_client/grpcclient.cpp

@@ -4,6 +4,8 @@ grpcclient * ggrpcclient;
 
 void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname)
 {
+    (void)index;
+    (void)dt;
     ggrpcclient->UpdateData(strdata,nSize,strmemname);
 
 }
@@ -16,12 +18,12 @@ grpcclient::grpcclient(std::string stryamlpath)
     dec_yaml(stryamlpath.data());
 
     int i;
-    for(i=0;i<mvectormsgunit.size();i++)
+    for(i=0;i<(int)mvectormsgunit.size();i++)
     {
         mvectormsgunit[i].mpa = iv::modulecomm::RegisterRecv(mvectormsgunit[i].mstrmsgname,ListenData);
     }
 
-    for(i=0;i<mvectorctrlmsgunit.size();i++)
+    for(i=0;i<(int)mvectorctrlmsgunit.size();i++)
     {
         mvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterSend(mvectorctrlmsgunit[i].mstrmsgname,mvectorctrlmsgunit[i].mnBufferSize,
                                                                  mvectorctrlmsgunit[i].mnBufferCount);
@@ -66,8 +68,8 @@ void grpcclient::sendcloudmsg(iv::cloud::cloudmsg &xmsg,std::shared_ptr<::grpc::
         request.set_pingdev(0);
         request.set_pingmax(0);
         request.set_pingmin(0);
-        request.set_strnodeid("111");
-        request.set_strobjnodeid("222");
+        request.set_strnodeid(mstrnodeid);
+        request.set_strobjnodeid(mstrobjnodeid);
         request.set_xdata(strbuf,nbytesize);
 
 
@@ -91,6 +93,7 @@ void grpcclient::sendcloudmsg(iv::cloud::cloudmsg &xmsg,std::shared_ptr<::grpc::
 
 void grpcclient::threadsend(std::shared_ptr<::grpc::ClientReaderWriter<iv::CloudSwapRequestStream, iv::CloudSwapReplyStream> > writer,std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime)
 {
+    (void)nlastreftime;
     std::cout<<"threadsend start "<<std::endl;
 
     int ninterval = 1000;
@@ -138,6 +141,7 @@ void grpcclient::threadsend(std::shared_ptr<::grpc::ClientReaderWriter<iv::Cloud
 
 void grpcclient::threadrecv(std::shared_ptr<::grpc::ClientReaderWriter<iv::CloudSwapRequestStream, iv::CloudSwapReplyStream> > writer, std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime)
 {
+    (void)nlastreftime;
     std::cout<<"threadrecv start"<<std::endl;
     iv::CloudSwapReplyStream reply;
     while (writer->Read(&reply)) {
@@ -232,25 +236,9 @@ void grpcclient::run()
                     *pntime = QDateTime::currentMSecsSinceEpoch();
 
     std::thread * pthread = new std::thread(&grpcclient::threadRPC,this,pntime,pbRun);
+    (void)pthread;
     return;
 
-//    while(!QThread::isInterruptionRequested())
-//    {
-//        std::shared_ptr<bool> pbRun(new bool);
-//        *pbRun = true;
-//        std::shared_ptr<qint64> pntime(new qint64);
-//        *pntime = QDateTime::currentMSecsSinceEpoch();
-//        std::thread * pthread = new std::thread(&grpcclient::threadRPC,this,pntime,pbRun);
-//        (void )pthread;
-//        while(((QDateTime::currentMSecsSinceEpoch() - *pntime)<10000)&&(*pbRun))
-//        {
-//            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
-//        }
-//        std::cout<<" reconect."<<std::endl;
-//        *pbRun = false;
-
-//    }
-
 }
 
 
@@ -265,10 +253,10 @@ void grpcclient::dec_yaml(const char * stryamlpath)
     catch(YAML::BadFile e)
     {
         qDebug("load error.");
+        (void)e;
         return;
     }
 
-    std::vector<std::string> vecmodulename;
 
 
     if(config["server"])
@@ -279,59 +267,49 @@ void grpcclient::dec_yaml(const char * stryamlpath)
     {
         gstrserverport = config["port"].as<std::string>();
     }
-    if(config["uploadinterval"])
-    {
-        gstruploadinterval = config["uploadinterval"].as<std::string>();
-    }
 
-    if(config["VIN"])
-    {
-        gstrVIN = config["VIN"].as<std::string>();
-    }
 
-    if(config["queryMD5"])
-    {
-        gstrqueryMD5 = config["queryMD5"].as<std::string>();
-    }
-    else
+    if(config["nodeid"])
     {
-        return;
+        mstrnodeid = config["nodeid"].as<std::string>();
     }
 
-    if(config["ctrlMD5"])
+    if(config["objnodeid"])
     {
-        gstrctrlMD5 = config["ctrlMD5"].as<std::string>();
+        mstrobjnodeid = config["objnodeid"].as<std::string>();
     }
 
 
+
+
     std::string strmsgname;
 
-    if(config["uploadmessage"])
+    if(config["upmessage"])
     {
 
-        for(YAML::const_iterator it= config["uploadmessage"].begin(); it != config["uploadmessage"].end();++it)
+        for(YAML::const_iterator it= config["upmessage"].begin(); it != config["upmessage"].end();++it)
         {
             std::string strtitle = it->first.as<std::string>();
             std::cout<<strtitle<<std::endl;
 
-            if(config["uploadmessage"][strtitle]["msgname"]&&config["uploadmessage"][strtitle]["buffersize"]&&config["uploadmessage"][strtitle]["buffercount"])
+            if(config["upmessage"][strtitle]["msgname"]&&config["upmessage"][strtitle]["buffersize"]&&config["upmessage"][strtitle]["buffercount"])
             {
                 iv::msgunit xmu;
-                strmsgname = config["uploadmessage"][strtitle]["msgname"].as<std::string>();
+                strmsgname = config["upmessage"][strtitle]["msgname"].as<std::string>();
                 strncpy(xmu.mstrmsgname,strmsgname.data(),255);
-                xmu.mnBufferSize = config["uploadmessage"][strtitle]["buffersize"].as<int>();
-                xmu.mnBufferCount = config["uploadmessage"][strtitle]["buffercount"].as<int>();
-                if(config["uploadmessage"][strtitle]["bimportant"])
+                xmu.mnBufferSize = config["upmessage"][strtitle]["buffersize"].as<int>();
+                xmu.mnBufferCount = config["upmessage"][strtitle]["buffercount"].as<int>();
+                if(config["upmessage"][strtitle]["bimportant"])
                 {
-                   std::string strimportant =    config["uploadmessage"][strtitle]["bimportant"].as<std::string>();
+                   std::string strimportant =    config["upmessage"][strtitle]["bimportant"].as<std::string>();
                    if(strimportant == "true")
                    {
                        xmu.mbImportant = true;
                    }
                 }
-                if(config["uploadmessage"][strtitle]["keeptime"])
+                if(config["upmessage"][strtitle]["keeptime"])
                 {
-                   std::string strkeep =    config["uploadmessage"][strtitle]["keeptime"].as<std::string>();
+                   std::string strkeep =    config["upmessage"][strtitle]["keeptime"].as<std::string>();
                    xmu.mnkeeptime = atoi(strkeep.data());
                 }
                 mvectormsgunit.push_back(xmu);
@@ -349,9 +327,9 @@ void grpcclient::dec_yaml(const char * stryamlpath)
        return;
     }
 
-    if(config["ctrlmessage"])
+    if(config["downmessage"])
     {
-        std::string strnodename = "ctrlmessage";
+        std::string strnodename = "downmessage";
         for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it)
         {
             std::string strtitle = it->first.as<std::string>();

+ 3 - 0
src/driver/driver_cloud_swap_client/grpcclient.h

@@ -73,6 +73,9 @@ private:
     std::string gstrqueryMD5 = "5d41402abc4b2a76b9719d911017c591";//"5d41402abc4b2a76b9719d911017c592";
     std::string gstrctrlMD5 = "5d41402abc4b2a76b9719d911017c591";
 
+    std::string mstrnodeid = "111";
+    std::string mstrobjnodeid = "222";
+
 
 
     int gindex = 0;

+ 1 - 438
src/driver/driver_cloud_swap_client/main.cpp

@@ -4,431 +4,6 @@
 
 #include "ivversion.h"
 
-/*
-#include <yaml-cpp/yaml.h>
-
-#include <QDateTime>
-
-#include <iostream>
-
-#include <vector>
-
-#include <memory>
-
-#include <QMutex>
-
-#include <thread>
-
-#include "modulecomm.h"
-
-#include "cloud.pb.h"
-
-#include <iostream>
-#include <memory>
-#include <string>
-
-#include <grpcpp/grpcpp.h>
-
-#include "uploadmsg.grpc.pb.h"
-
-
-using grpc::Channel;
-using grpc::ClientContext;
-using grpc::Status;
-
-
-void test()
-{
-    std::string target_str = "0.0.0.0:50051";
-
-    auto cargs = grpc::ChannelArguments();
-    cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
-    cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
-
-    std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
-             target_str, grpc::InsecureChannelCredentials(),cargs);
-
-    std::unique_ptr<iv::Upload::Stub> stub_ = iv::Upload::NewStub(channel);
-
-
-    iv::UploadRequest request;
-
-
-
-    // Container for the data we expect from the server.
-    iv::UploadReply reply;
-
-    int nid = 0;
-
-    nid = 1;
-
-    while(1)
-    {
-
-
-
-
-        // Context for the client. It could be used to convey extra information to
-        // the server and/or tweak certain RPC behaviors.
-
-
-        ClientContext context ;
-        std::this_thread::sleep_for(std::chrono::milliseconds(100));
-        qint64 time1 = QDateTime::currentMSecsSinceEpoch();
-
-        request.set_id(nid);
-        request.set_ntime(time1);
-        nid++;
-        // The actual RPC.
-        Status status = stub_->upload(&context, request, &reply);
-        if (status.ok()) {
-            std::cout<<nid<<" upload successfully"<<std::endl;
-//            qint64 time2;
-
-//            memcpy(&time2,reply.data().data(),8);
-//            qint64 time3 = QDateTime::currentMSecsSinceEpoch();
-//            std::cout<<"reply data size is "<<reply.data().size()<<std::endl;
-//            std::cout<<" latency is "<<(time2 - time1)<<" 2 is "<<(time3 - time2)<<std::endl;
-//          return reply.message();
-        } else {
-          std::cout << status.error_code() << ": " << status.error_message()
-                    << std::endl;
-          std::cout<<"RPC failed"<<std::endl;
-          std::this_thread::sleep_for(std::chrono::milliseconds(900));
-
-//          delete pcontext;
-//          pcontext = new ClientContext;
-
-//          channel = grpc::CreateCustomChannel(
-//                   target_str, grpc::InsecureChannelCredentials(),cargs);
-
-//          stub_ = iv::Upload::NewStub(channel);
-        }
-    }
-}
-
-std::string gstrserverip =  "0.0.0.0";//"123.57.212.138";
-std::string gstrserverport = "50051";//"9000";
-std::string gstruploadinterval = "1000";
-void * gpa;
-QMutex gMutexMsg;
-std::thread * guploadthread;
-
-
-
-
-std::vector<iv::msgunit> mvectormsgunit;
-
-std::vector<iv::msgunit> mvectorctrlmsgunit;
-
-
-std::string gstrVIN = "AAAAAAAAAAAAAAAAA";
-std::string gstrqueryMD5 = "5d41402abc4b2a76b9719d911017c591";//"5d41402abc4b2a76b9719d911017c592";
-std::string gstrctrlMD5 = "5d41402abc4b2a76b9719d911017c591";
-
-
-
-int gindex = 0;
-
-void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname)
-{
-
-    int nsize = mvectormsgunit.size();
-    int i;
-    for(i=0;i<nsize;i++)
-    {
-        if(strncmp(strmemname,mvectormsgunit[i].mstrmsgname,255) == 0)
-        {
-            gMutexMsg.lock();
-            char * strtem = new char[nSize];
-            memcpy(strtem,strdata,nSize);
-            mvectormsgunit[i].mpstrmsgdata.reset(strtem);
-            mvectormsgunit[i].mndatasize = nSize;
-            mvectormsgunit[i].mbRefresh = true;
-            gMutexMsg.unlock();
-            break;
-        }
-    }
-}
-
-
-void sharectrlmsg(iv::cloud::cloudmsg * pxmsg)
-{
-    int i;
-    int nsize = pxmsg->xclouddata_size();
-    for(i=0;i<nsize;i++)
-    {
-        int j;
-        int nquerysize = mvectorctrlmsgunit.size();
-        for(j=0;j<nquerysize;j++)
-        {
-            if(strncmp(pxmsg->xclouddata(i).msgname().data(), mvectorctrlmsgunit[j].mstrmsgname,255) == 0)
-            {
- //               qDebug("size is %d ",pxmsg->xclouddata(i).data().size());
-                iv::modulecomm::ModuleSendMsg(mvectorctrlmsgunit[j].mpa,pxmsg->xclouddata(i).data().data(),pxmsg->xclouddata(i).data().size());
-                break;
-            }
-        }
-    }
-}
-
-
-void threadupload()
-{
-    int nsize = mvectormsgunit.size();
-    int i;
-
-    int ninterval = atoi(gstruploadinterval.data());
-    if(ninterval<=0)ninterval = 100;
-
-    QTime xTime;
-    xTime.start();
-    int nlastsend = xTime.elapsed();
-
-    std::string target_str = gstrserverip+":";
-    target_str = target_str + gstrserverport ;//std::to_string()
-    auto cargs = grpc::ChannelArguments();
-    cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
-    cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
-
-    std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
-             target_str, grpc::InsecureChannelCredentials(),cargs);
-
-    std::unique_ptr<iv::Upload::Stub> stub_ = iv::Upload::NewStub(channel);
-
-
-    iv::UploadRequest request;
-
-    int nid = 0;
-
-    // Container for the data we expect from the server.
-    iv::UploadReply reply;
-
-    gpr_timespec timespec;
-      timespec.tv_sec = 30;//设置阻塞时间为2秒
-      timespec.tv_nsec = 0;
-      timespec.clock_type = GPR_TIMESPAN;
-
- //   ClientContext context;
-
-
-
-    while(true)
-    {
-        std::this_thread::sleep_for(std::chrono::milliseconds(1));
-        if((xTime.elapsed()-nlastsend)<ninterval)
-        {
-            continue;
-        }
-
-            bool bImportant = false;
-            int nkeeptime = 0;
-            iv::cloud::cloudmsg xmsg;
-            xmsg.set_xtime(QDateTime::currentMSecsSinceEpoch());
-            gMutexMsg.lock();
-            for(i=0;i<nsize;i++)
-            {
-                if(mvectormsgunit[i].mbRefresh)
-                {
-                    mvectormsgunit[i].mbRefresh = false;
-                    if(mvectormsgunit[i].mbImportant)
-                    {
-                        bImportant = true;
-                    }
-                    if(mvectormsgunit[i].mnkeeptime > nkeeptime)
-                    {
-                        nkeeptime = mvectormsgunit[i].mnkeeptime;
-                    }
-                    iv::cloud::cloudunit xcloudunit;
-                    xcloudunit.set_msgname(mvectormsgunit[i].mstrmsgname);
-                    xcloudunit.set_data(mvectormsgunit[i].mpstrmsgdata.get(),mvectormsgunit[i].mndatasize);
-                    iv::cloud::cloudunit * pcu = xmsg.add_xclouddata();
-                    pcu->CopyFrom(xcloudunit);
-                }
-
-            }
-            gMutexMsg.unlock();
-
-            int nbytesize = xmsg.ByteSize();
-            char * strbuf = new char[nbytesize];
-            std::shared_ptr<char> pstrbuf;
-            pstrbuf.reset(strbuf);
-            if(xmsg.SerializeToArray(strbuf,nbytesize))
-            {
-
-                ClientContext context ;
-                context.set_deadline(timespec);
-                qint64 time1 = QDateTime::currentMSecsSinceEpoch();
-
-                request.set_id(nid);
-                request.set_ntime(time1);
-                request.set_strquerymd5(gstrqueryMD5);
-                request.set_strctrlmd5(gstrctrlMD5);
-                request.set_strvin(gstrVIN);
-                request.set_xdata(strbuf,nbytesize);
-                request.set_kepptime(nkeeptime);
-                request.set_bimportant(bImportant);
-                nid++;
-                // The actual RPC.
-                Status status = stub_->upload(&context, request, &reply);
-                if (status.ok()) {
-                    std::cout<<nid<<" upload successfully"<<std::endl;
-                    if(reply.nres() == 1)
-                    {
-                        iv::cloud::cloudmsg xmsg;
-                        if(xmsg.ParseFromArray(reply.xdata().data(),reply.xdata().size()))
-                        {
-                            sharectrlmsg(&xmsg);
-                        }
-                    }
-                } else {
-                  std::cout << status.error_code() << ": " << status.error_message()
-                            << std::endl;
-                  std::cout<<"RPC failed"<<std::endl;
-                  if(status.error_code() == 4)
-                  {
-                      std::cout<<" RPC Exceed Time, Create New stub_"<<std::endl;
-                      channel = grpc::CreateCustomChannel(
-                               target_str, grpc::InsecureChannelCredentials(),cargs);
-
-                      stub_ = iv::Upload::NewStub(channel);
-                  }
-                  std::this_thread::sleep_for(std::chrono::milliseconds(900));
-
-                }
-
-            }
-            nlastsend = xTime.elapsed();
-
-    }
-}
-
-
-void dec_yaml(const char * stryamlpath)
-{
-
-    YAML::Node config;
-    try
-    {
-        config = YAML::LoadFile(stryamlpath);
-    }
-    catch(YAML::BadFile e)
-    {
-        qDebug("load error.");
-        return;
-    }
-
-    std::vector<std::string> vecmodulename;
-
-
-    if(config["server"])
-    {
-        gstrserverip = config["server"].as<std::string>();
-    }
-    if(config["port"])
-    {
-        gstrserverport = config["port"].as<std::string>();
-    }
-    if(config["uploadinterval"])
-    {
-        gstruploadinterval = config["uploadinterval"].as<std::string>();
-    }
-
-    if(config["VIN"])
-    {
-        gstrVIN = config["VIN"].as<std::string>();
-    }
-
-    if(config["queryMD5"])
-    {
-        gstrqueryMD5 = config["queryMD5"].as<std::string>();
-    }
-    else
-    {
-        return;
-    }
-
-    if(config["ctrlMD5"])
-    {
-        gstrctrlMD5 = config["ctrlMD5"].as<std::string>();
-    }
-
-
-    std::string strmsgname;
-
-    if(config["uploadmessage"])
-    {
-
-        for(YAML::const_iterator it= config["uploadmessage"].begin(); it != config["uploadmessage"].end();++it)
-        {
-            std::string strtitle = it->first.as<std::string>();
-            std::cout<<strtitle<<std::endl;
-
-            if(config["uploadmessage"][strtitle]["msgname"]&&config["uploadmessage"][strtitle]["buffersize"]&&config["uploadmessage"][strtitle]["buffercount"])
-            {
-                iv::msgunit xmu;
-                strmsgname = config["uploadmessage"][strtitle]["msgname"].as<std::string>();
-                strncpy(xmu.mstrmsgname,strmsgname.data(),255);
-                xmu.mnBufferSize = config["uploadmessage"][strtitle]["buffersize"].as<int>();
-                xmu.mnBufferCount = config["uploadmessage"][strtitle]["buffercount"].as<int>();
-                if(config["uploadmessage"][strtitle]["bimportant"])
-                {
-                   std::string strimportant =    config["uploadmessage"][strtitle]["bimportant"].as<std::string>();
-                   if(strimportant == "true")
-                   {
-                       xmu.mbImportant = true;
-                   }
-                }
-                if(config["uploadmessage"][strtitle]["keeptime"])
-                {
-                   std::string strkeep =    config["uploadmessage"][strtitle]["keeptime"].as<std::string>();
-                   xmu.mnkeeptime = atoi(strkeep.data());
-                }
-                mvectormsgunit.push_back(xmu);
-            }
-        }
-    }
-    else
-    {
-
-
-    }
-
-    if(!config["ctrlMD5"])
-    {
-       return;
-    }
-
-    if(config["ctrlmessage"])
-    {
-        std::string strnodename = "ctrlmessage";
-        for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it)
-        {
-            std::string strtitle = it->first.as<std::string>();
-            std::cout<<strtitle<<std::endl;
-
-            if(config[strnodename][strtitle]["msgname"]&&config[strnodename][strtitle]["buffersize"]&&config[strnodename][strtitle]["buffercount"])
-            {
-                iv::msgunit xmu;
-                strmsgname = config[strnodename][strtitle]["msgname"].as<std::string>();
-                strncpy(xmu.mstrmsgname,strmsgname.data(),255);
-                xmu.mnBufferSize = config[strnodename][strtitle]["buffersize"].as<int>();
-                xmu.mnBufferCount = config[strnodename][strtitle]["buffercount"].as<int>();
-                mvectorctrlmsgunit.push_back(xmu);
-            }
-        }
-    }
-    else
-    {
-
-    }
-
-    return;
-
-}
-
-
-*/
 
 int main(int argc, char *argv[])
 {
@@ -442,7 +17,7 @@ int main(int argc, char *argv[])
     char stryamlpath[256];
     if(argc<2)
     {
-        snprintf(stryamlpath,255,"driver_cloud_grpc_client_stream.yaml");
+        snprintf(stryamlpath,255,"driver_cloud_swap_client.yaml");
 //        strncpy(stryamlpath,abs_ymlpath,255);
     }
     else
@@ -454,19 +29,7 @@ int main(int argc, char *argv[])
     grpcclient * pgrpcclient = new grpcclient(stryamlpath);
     pgrpcclient->start();
 
-//    int i;
-//    for(i=0;i<mvectormsgunit.size();i++)
-//    {
-//        mvectormsgunit[i].mpa = iv::modulecomm::RegisterRecv(mvectormsgunit[i].mstrmsgname,ListenData);
-//    }
-
-//    for(i=0;i<mvectorctrlmsgunit.size();i++)
-//    {
-//        mvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterSend(mvectorctrlmsgunit[i].mstrmsgname,mvectorctrlmsgunit[i].mnBufferSize,
-//                                                                 mvectorctrlmsgunit[i].mnBufferCount);
-//    }
 
-//    guploadthread = new std::thread(threadupload);
 
     return a.exec();
 }