Эх сурвалжийг харах

add driver_cloud_swap_client. not complete.

yuchuli 3 жил өмнө
parent
commit
a9e74d686a

+ 78 - 0
src/driver/driver_cloud_swap_client/cloudswap.grpc.pb.cc

@@ -0,0 +1,78 @@
+// Generated by the gRPC C++ plugin.
+// If you make any local change, they will be lost.
+// source: cloudswap.proto
+
+#include "cloudswap.pb.h"
+#include "cloudswap.grpc.pb.h"
+
+#include <functional>
+#include <grpcpp/impl/codegen/async_stream.h>
+#include <grpcpp/impl/codegen/async_unary_call.h>
+#include <grpcpp/impl/codegen/channel_interface.h>
+#include <grpcpp/impl/codegen/client_unary_call.h>
+#include <grpcpp/impl/codegen/client_callback.h>
+#include <grpcpp/impl/codegen/message_allocator.h>
+#include <grpcpp/impl/codegen/method_handler.h>
+#include <grpcpp/impl/codegen/rpc_service_method.h>
+#include <grpcpp/impl/codegen/server_callback.h>
+#include <grpcpp/impl/codegen/server_callback_handlers.h>
+#include <grpcpp/impl/codegen/server_context.h>
+#include <grpcpp/impl/codegen/service_type.h>
+#include <grpcpp/impl/codegen/sync_stream.h>
+namespace iv {
+
+static const char* CloudSwapStream_method_names[] = {
+  "/iv.CloudSwapStream/swap",
+};
+
+std::unique_ptr< CloudSwapStream::Stub> CloudSwapStream::NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options) {
+  (void)options;
+  std::unique_ptr< CloudSwapStream::Stub> stub(new CloudSwapStream::Stub(channel));
+  return stub;
+}
+
+CloudSwapStream::Stub::Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel)
+  : channel_(channel), rpcmethod_swap_(CloudSwapStream_method_names[0], ::grpc::internal::RpcMethod::BIDI_STREAMING, channel)
+  {}
+
+::grpc::ClientReaderWriter< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>* CloudSwapStream::Stub::swapRaw(::grpc::ClientContext* context) {
+  return ::grpc::internal::ClientReaderWriterFactory< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>::Create(channel_.get(), rpcmethod_swap_, context);
+}
+
+void CloudSwapStream::Stub::experimental_async::swap(::grpc::ClientContext* context, ::grpc::experimental::ClientBidiReactor< ::iv::CloudSwapRequestStream,::iv::CloudSwapReplyStream>* reactor) {
+  ::grpc::internal::ClientCallbackReaderWriterFactory< ::iv::CloudSwapRequestStream,::iv::CloudSwapReplyStream>::Create(stub_->channel_.get(), stub_->rpcmethod_swap_, context, reactor);
+}
+
+::grpc::ClientAsyncReaderWriter< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>* CloudSwapStream::Stub::AsyncswapRaw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) {
+  return ::grpc::internal::ClientAsyncReaderWriterFactory< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>::Create(channel_.get(), cq, rpcmethod_swap_, context, true, tag);
+}
+
+::grpc::ClientAsyncReaderWriter< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>* CloudSwapStream::Stub::PrepareAsyncswapRaw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) {
+  return ::grpc::internal::ClientAsyncReaderWriterFactory< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>::Create(channel_.get(), cq, rpcmethod_swap_, context, false, nullptr);
+}
+
+CloudSwapStream::Service::Service() {
+  AddMethod(new ::grpc::internal::RpcServiceMethod(
+      CloudSwapStream_method_names[0],
+      ::grpc::internal::RpcMethod::BIDI_STREAMING,
+      new ::grpc::internal::BidiStreamingHandler< CloudSwapStream::Service, ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>(
+          [](CloudSwapStream::Service* service,
+             ::grpc::ServerContext* ctx,
+             ::grpc::ServerReaderWriter<::iv::CloudSwapReplyStream,
+             ::iv::CloudSwapRequestStream>* stream) {
+               return service->swap(ctx, stream);
+             }, this)));
+}
+
+CloudSwapStream::Service::~Service() {
+}
+
+::grpc::Status CloudSwapStream::Service::swap(::grpc::ServerContext* context, ::grpc::ServerReaderWriter< ::iv::CloudSwapReplyStream, ::iv::CloudSwapRequestStream>* stream) {
+  (void) context;
+  (void) stream;
+  return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
+}
+
+
+}  // namespace iv
+

+ 279 - 0
src/driver/driver_cloud_swap_client/cloudswap.grpc.pb.h

@@ -0,0 +1,279 @@
+// Generated by the gRPC C++ plugin.
+// If you make any local change, they will be lost.
+// source: cloudswap.proto
+// Original file comments:
+// Copyright 2015 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+#ifndef GRPC_cloudswap_2eproto__INCLUDED
+#define GRPC_cloudswap_2eproto__INCLUDED
+
+#include "cloudswap.pb.h"
+
+#include <functional>
+#include <grpc/impl/codegen/port_platform.h>
+#include <grpcpp/impl/codegen/async_generic_service.h>
+#include <grpcpp/impl/codegen/async_stream.h>
+#include <grpcpp/impl/codegen/async_unary_call.h>
+#include <grpcpp/impl/codegen/client_callback.h>
+#include <grpcpp/impl/codegen/client_context.h>
+#include <grpcpp/impl/codegen/completion_queue.h>
+#include <grpcpp/impl/codegen/message_allocator.h>
+#include <grpcpp/impl/codegen/method_handler.h>
+#include <grpcpp/impl/codegen/proto_utils.h>
+#include <grpcpp/impl/codegen/rpc_method.h>
+#include <grpcpp/impl/codegen/server_callback.h>
+#include <grpcpp/impl/codegen/server_callback_handlers.h>
+#include <grpcpp/impl/codegen/server_context.h>
+#include <grpcpp/impl/codegen/service_type.h>
+#include <grpcpp/impl/codegen/status.h>
+#include <grpcpp/impl/codegen/stub_options.h>
+#include <grpcpp/impl/codegen/sync_stream.h>
+
+namespace iv {
+
+// The Upload service definition.
+class CloudSwapStream final {
+ public:
+  static constexpr char const* service_full_name() {
+    return "iv.CloudSwapStream";
+  }
+  class StubInterface {
+   public:
+    virtual ~StubInterface() {}
+    // Sends a Upload
+    std::unique_ptr< ::grpc::ClientReaderWriterInterface< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>> swap(::grpc::ClientContext* context) {
+      return std::unique_ptr< ::grpc::ClientReaderWriterInterface< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>>(swapRaw(context));
+    }
+    std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>> Asyncswap(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) {
+      return std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>>(AsyncswapRaw(context, cq, tag));
+    }
+    std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>> PrepareAsyncswap(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) {
+      return std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>>(PrepareAsyncswapRaw(context, cq));
+    }
+    class experimental_async_interface {
+     public:
+      virtual ~experimental_async_interface() {}
+      // Sends a Upload
+      #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
+      virtual void swap(::grpc::ClientContext* context, ::grpc::ClientBidiReactor< ::iv::CloudSwapRequestStream,::iv::CloudSwapReplyStream>* reactor) = 0;
+      #else
+      virtual void swap(::grpc::ClientContext* context, ::grpc::experimental::ClientBidiReactor< ::iv::CloudSwapRequestStream,::iv::CloudSwapReplyStream>* reactor) = 0;
+      #endif
+    };
+    #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
+    typedef class experimental_async_interface async_interface;
+    #endif
+    #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
+    async_interface* async() { return experimental_async(); }
+    #endif
+    virtual class experimental_async_interface* experimental_async() { return nullptr; }
+  private:
+    virtual ::grpc::ClientReaderWriterInterface< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>* swapRaw(::grpc::ClientContext* context) = 0;
+    virtual ::grpc::ClientAsyncReaderWriterInterface< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>* AsyncswapRaw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) = 0;
+    virtual ::grpc::ClientAsyncReaderWriterInterface< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>* PrepareAsyncswapRaw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) = 0;
+  };
+  class Stub final : public StubInterface {
+   public:
+    Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);
+    std::unique_ptr< ::grpc::ClientReaderWriter< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>> swap(::grpc::ClientContext* context) {
+      return std::unique_ptr< ::grpc::ClientReaderWriter< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>>(swapRaw(context));
+    }
+    std::unique_ptr<  ::grpc::ClientAsyncReaderWriter< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>> Asyncswap(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) {
+      return std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>>(AsyncswapRaw(context, cq, tag));
+    }
+    std::unique_ptr<  ::grpc::ClientAsyncReaderWriter< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>> PrepareAsyncswap(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) {
+      return std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>>(PrepareAsyncswapRaw(context, cq));
+    }
+    class experimental_async final :
+      public StubInterface::experimental_async_interface {
+     public:
+      #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
+      void swap(::grpc::ClientContext* context, ::grpc::ClientBidiReactor< ::iv::CloudSwapRequestStream,::iv::CloudSwapReplyStream>* reactor) override;
+      #else
+      void swap(::grpc::ClientContext* context, ::grpc::experimental::ClientBidiReactor< ::iv::CloudSwapRequestStream,::iv::CloudSwapReplyStream>* reactor) override;
+      #endif
+     private:
+      friend class Stub;
+      explicit experimental_async(Stub* stub): stub_(stub) { }
+      Stub* stub() { return stub_; }
+      Stub* stub_;
+    };
+    class experimental_async_interface* experimental_async() override { return &async_stub_; }
+
+   private:
+    std::shared_ptr< ::grpc::ChannelInterface> channel_;
+    class experimental_async async_stub_{this};
+    ::grpc::ClientReaderWriter< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>* swapRaw(::grpc::ClientContext* context) override;
+    ::grpc::ClientAsyncReaderWriter< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>* AsyncswapRaw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) override;
+    ::grpc::ClientAsyncReaderWriter< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>* PrepareAsyncswapRaw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) override;
+    const ::grpc::internal::RpcMethod rpcmethod_swap_;
+  };
+  static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions());
+
+  class Service : public ::grpc::Service {
+   public:
+    Service();
+    virtual ~Service();
+    // Sends a Upload
+    virtual ::grpc::Status swap(::grpc::ServerContext* context, ::grpc::ServerReaderWriter< ::iv::CloudSwapReplyStream, ::iv::CloudSwapRequestStream>* stream);
+  };
+  template <class BaseClass>
+  class WithAsyncMethod_swap : public BaseClass {
+   private:
+    void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
+   public:
+    WithAsyncMethod_swap() {
+      ::grpc::Service::MarkMethodAsync(0);
+    }
+    ~WithAsyncMethod_swap() override {
+      BaseClassMustBeDerivedFromService(this);
+    }
+    // disable synchronous version of this method
+    ::grpc::Status swap(::grpc::ServerContext* /*context*/, ::grpc::ServerReaderWriter< ::iv::CloudSwapReplyStream, ::iv::CloudSwapRequestStream>* /*stream*/)  override {
+      abort();
+      return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
+    }
+    void Requestswap(::grpc::ServerContext* context, ::grpc::ServerAsyncReaderWriter< ::iv::CloudSwapReplyStream, ::iv::CloudSwapRequestStream>* stream, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) {
+      ::grpc::Service::RequestAsyncBidiStreaming(0, context, stream, new_call_cq, notification_cq, tag);
+    }
+  };
+  typedef WithAsyncMethod_swap<Service > AsyncService;
+  template <class BaseClass>
+  class ExperimentalWithCallbackMethod_swap : public BaseClass {
+   private:
+    void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
+   public:
+    ExperimentalWithCallbackMethod_swap() {
+    #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
+      ::grpc::Service::
+    #else
+      ::grpc::Service::experimental().
+    #endif
+        MarkMethodCallback(0,
+          new ::grpc::internal::CallbackBidiHandler< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>(
+            [this](
+    #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
+                   ::grpc::CallbackServerContext*
+    #else
+                   ::grpc::experimental::CallbackServerContext*
+    #endif
+                     context) { return this->swap(context); }));
+    }
+    ~ExperimentalWithCallbackMethod_swap() override {
+      BaseClassMustBeDerivedFromService(this);
+    }
+    // disable synchronous version of this method
+    ::grpc::Status swap(::grpc::ServerContext* /*context*/, ::grpc::ServerReaderWriter< ::iv::CloudSwapReplyStream, ::iv::CloudSwapRequestStream>* /*stream*/)  override {
+      abort();
+      return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
+    }
+    #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
+    virtual ::grpc::ServerBidiReactor< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>* swap(
+      ::grpc::CallbackServerContext* /*context*/)
+    #else
+    virtual ::grpc::experimental::ServerBidiReactor< ::iv::CloudSwapRequestStream, ::iv::CloudSwapReplyStream>* swap(
+      ::grpc::experimental::CallbackServerContext* /*context*/)
+    #endif
+      { return nullptr; }
+  };
+  #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
+  typedef ExperimentalWithCallbackMethod_swap<Service > CallbackService;
+  #endif
+
+  typedef ExperimentalWithCallbackMethod_swap<Service > ExperimentalCallbackService;
+  template <class BaseClass>
+  class WithGenericMethod_swap : public BaseClass {
+   private:
+    void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
+   public:
+    WithGenericMethod_swap() {
+      ::grpc::Service::MarkMethodGeneric(0);
+    }
+    ~WithGenericMethod_swap() override {
+      BaseClassMustBeDerivedFromService(this);
+    }
+    // disable synchronous version of this method
+    ::grpc::Status swap(::grpc::ServerContext* /*context*/, ::grpc::ServerReaderWriter< ::iv::CloudSwapReplyStream, ::iv::CloudSwapRequestStream>* /*stream*/)  override {
+      abort();
+      return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
+    }
+  };
+  template <class BaseClass>
+  class WithRawMethod_swap : public BaseClass {
+   private:
+    void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
+   public:
+    WithRawMethod_swap() {
+      ::grpc::Service::MarkMethodRaw(0);
+    }
+    ~WithRawMethod_swap() override {
+      BaseClassMustBeDerivedFromService(this);
+    }
+    // disable synchronous version of this method
+    ::grpc::Status swap(::grpc::ServerContext* /*context*/, ::grpc::ServerReaderWriter< ::iv::CloudSwapReplyStream, ::iv::CloudSwapRequestStream>* /*stream*/)  override {
+      abort();
+      return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
+    }
+    void Requestswap(::grpc::ServerContext* context, ::grpc::ServerAsyncReaderWriter< ::grpc::ByteBuffer, ::grpc::ByteBuffer>* stream, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) {
+      ::grpc::Service::RequestAsyncBidiStreaming(0, context, stream, new_call_cq, notification_cq, tag);
+    }
+  };
+  template <class BaseClass>
+  class ExperimentalWithRawCallbackMethod_swap : public BaseClass {
+   private:
+    void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
+   public:
+    ExperimentalWithRawCallbackMethod_swap() {
+    #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
+      ::grpc::Service::
+    #else
+      ::grpc::Service::experimental().
+    #endif
+        MarkMethodRawCallback(0,
+          new ::grpc::internal::CallbackBidiHandler< ::grpc::ByteBuffer, ::grpc::ByteBuffer>(
+            [this](
+    #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
+                   ::grpc::CallbackServerContext*
+    #else
+                   ::grpc::experimental::CallbackServerContext*
+    #endif
+                     context) { return this->swap(context); }));
+    }
+    ~ExperimentalWithRawCallbackMethod_swap() override {
+      BaseClassMustBeDerivedFromService(this);
+    }
+    // disable synchronous version of this method
+    ::grpc::Status swap(::grpc::ServerContext* /*context*/, ::grpc::ServerReaderWriter< ::iv::CloudSwapReplyStream, ::iv::CloudSwapRequestStream>* /*stream*/)  override {
+      abort();
+      return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
+    }
+    #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
+    virtual ::grpc::ServerBidiReactor< ::grpc::ByteBuffer, ::grpc::ByteBuffer>* swap(
+      ::grpc::CallbackServerContext* /*context*/)
+    #else
+    virtual ::grpc::experimental::ServerBidiReactor< ::grpc::ByteBuffer, ::grpc::ByteBuffer>* swap(
+      ::grpc::experimental::CallbackServerContext* /*context*/)
+    #endif
+      { return nullptr; }
+  };
+  typedef Service StreamedUnaryService;
+  typedef Service SplitStreamedService;
+  typedef Service StreamedService;
+};
+
+}  // namespace iv
+
+
+#endif  // GRPC_cloudswap_2eproto__INCLUDED

+ 58 - 0
src/driver/driver_cloud_swap_client/driver_cloud_swap_client.pro

@@ -0,0 +1,58 @@
+QT -= gui
+
+CONFIG += c++11 console
+CONFIG -= app_bundle
+
+QMAKE_LFLAGS += -no-pie
+
+# The following define makes your compiler emit warnings if you use
+# any Qt feature that has been marked deprecated (the exact warnings
+# depend on your compiler). Please consult the documentation of the
+# deprecated API in order to know how to port your code away from it.
+DEFINES += QT_DEPRECATED_WARNINGS
+
+# You can also make your code fail to compile if it uses deprecated APIs.
+# In order to do so, uncomment the following line.
+# You can also select to disable deprecated APIs only up to a certain version of Qt.
+#DEFINES += QT_DISABLE_DEPRECATED_BEFORE=0x060000    # disables all the APIs deprecated before Qt 6.0.0
+
+SOURCES += \
+        ../../include/msgtype/cloud.pb.cc \
+    ../../include/msgtype/cloudswap.pb.cc \
+        main.cpp \
+    grpcclient.cpp \
+    cloudswap.grpc.pb.cc
+
+# Default rules for deployment.
+qnx: target.path = /tmp/$${TARGET}/bin
+else: unix:!android: target.path = /opt/$${TARGET}/bin
+!isEmpty(target.path): INSTALLS += target
+
+!include(../../../include/common.pri ) {
+    error( "Couldn't find the common.pri file!" )
+}
+
+!include(../../../include/ivprotobuf.pri ) {
+    error( "Couldn't find the ivprotobuf.pri file!" )
+}
+
+!include(../../../include/ivboost.pri ) {
+    error( "Couldn't find the ivboost.pri file!" )
+}
+
+!include(../../../include/ivgrpc.pri ) {
+    error( "Couldn't find the ivgrpc.pri file!" )
+}
+
+!include(../../../include/ivyaml-cpp.pri ) {
+    error( "Couldn't find the ivyaml-cpp.pri file!" )
+}
+
+
+
+HEADERS += \
+    ../../include/msgtype/cloud.pb.h \
+    ../../include/msgtype/cloudswap.pb.h \
+    grpcclient.h \
+    cloudswap.grpc.pb.h
+

+ 47 - 0
src/driver/driver_cloud_swap_client/driver_cloud_swap_client.yaml

@@ -0,0 +1,47 @@
+server : 47.96.250.93
+port : 50051
+uploadinterval : 100
+
+VIN : AAAAAAAAAAAAAAAAA
+queryMD5 : 5d41402abc4b2a76b9719d911017c592
+ctrlMD5  : 5d41402abc4b2a76b9719d911017c592
+
+
+uploadmessage:
+  usbpic:
+    msgname: compresspic
+    buffersize: 10000000
+    buffercount: 1
+  hcp2_gpsimu:
+    msgname: hcp2_gpsimu
+    buffersize: 100000
+    buffercount: 1
+
+#  tracemap:
+#    msgname: tracemap
+#    buffersize: 10000000
+#    buffercount: 1
+
+  simpletrace:
+    msgname: simpletrace
+    buffersize: 10000000
+    buffercount: 1
+    bimportant: true
+    keeptime: 3000
+
+ctrlmessage:
+  xodrsrc:
+    msgname: xodrsrc
+    buffersize: 1000
+    buffercount: 1
+  xodrreq:
+    msgname: xodrreq
+    buffersize: 1000
+    buffercount: 1
+  remotectrl:
+    msgname: remotectrl
+    buffersize: 10000
+    buffercount: 1
+
+
+

+ 466 - 0
src/driver/driver_cloud_swap_client/grpcclient.cpp

@@ -0,0 +1,466 @@
+#include "grpcclient.h"
+
+grpcclient * ggrpcclient;
+
+void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname)
+{
+    ggrpcclient->UpdateData(strdata,nSize,strmemname);
+
+}
+
+
+grpcclient::grpcclient(std::string stryamlpath)
+{
+
+    ggrpcclient = this;
+    dec_yaml(stryamlpath.data());
+
+    int i;
+    for(i=0;i<mvectormsgunit.size();i++)
+    {
+        mvectormsgunit[i].mpa = iv::modulecomm::RegisterRecv(mvectormsgunit[i].mstrmsgname,ListenData);
+    }
+
+    for(i=0;i<mvectorctrlmsgunit.size();i++)
+    {
+        mvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterSend(mvectorctrlmsgunit[i].mstrmsgname,mvectorctrlmsgunit[i].mnBufferSize,
+                                                                 mvectorctrlmsgunit[i].mnBufferCount);
+    }
+}
+
+qint64 grpcclient::calclatency(qint64 nnewlatency)
+{
+    mvectorlatency.push_back(nnewlatency);
+    while(mvectorlatency.size()>5)mvectorlatency.erase(mvectorlatency.begin());
+    qint64 nlatencytotal =0;
+    int nsize = mvectorlatency.size();
+    int i;
+    for(i=0;i<nsize;i++)
+    {
+        nlatencytotal = nlatencytotal + mvectorlatency.at(i);
+    }
+    if(nsize > 0)
+    {
+        nlatencytotal = nlatencytotal/nsize;
+    }
+    mnlatency = nlatencytotal;
+    return nlatencytotal;
+}
+
+void grpcclient::threadsend(std::shared_ptr<::grpc::ClientReaderWriter<iv::UploadRequestStream, iv::UploadReplyStream> > writer,std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime)
+{
+    std::cout<<"threadsend start "<<std::endl;
+    int nsize = mvectormsgunit.size();
+    int i;
+
+    int ninterval = atoi(gstruploadinterval.data());
+    if(ninterval<=0)ninterval = 100;
+
+    mninterval = ninterval;
+    float ffraterate = 1.0f/((float)mninterval);
+    int nrawinterval = ninterval;
+    int nok = 0;
+
+    QTime xTime;
+    xTime.start();
+    int nlastsend = xTime.elapsed();
+
+    int nid= 0;
+
+    while(*pbrun)
+    {
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        if((xTime.elapsed()-nlastsend)<mninterval)
+        {
+            continue;
+        }
+
+        bool bImportant = false;
+        int nkeeptime = 0;
+        iv::cloud::cloudmsg xmsg;
+        xmsg.set_xtime(QDateTime::currentMSecsSinceEpoch());
+        gMutexMsg.lock();
+        for(i=0;i<nsize;i++)
+        {
+            if(mvectormsgunit[i].mbRefresh)
+            {
+                mvectormsgunit[i].mbRefresh = false;
+                if(mvectormsgunit[i].mbImportant)
+                {
+                    bImportant = true;
+                }
+                if(mvectormsgunit[i].mnkeeptime > nkeeptime)
+                {
+                    nkeeptime = mvectormsgunit[i].mnkeeptime;
+                }
+                iv::cloud::cloudunit xcloudunit;
+                xcloudunit.set_msgname(mvectormsgunit[i].mstrmsgname);
+                xcloudunit.set_data(mvectormsgunit[i].mpstrmsgdata.get(),mvectormsgunit[i].mndatasize);
+                iv::cloud::cloudunit * pcu = xmsg.add_xclouddata();
+                pcu->CopyFrom(xcloudunit);
+            }
+
+        }
+        gMutexMsg.unlock();
+
+        int nbytesize = xmsg.ByteSize();
+        char * strbuf = new char[nbytesize];
+        std::shared_ptr<char> pstrbuf;
+        pstrbuf.reset(strbuf);
+        if(xmsg.SerializeToArray(strbuf,nbytesize))
+        {
+            iv::UploadRequestStream request;
+            qint64 time1 = QDateTime::currentMSecsSinceEpoch();
+
+            request.set_id(nid);
+            request.set_ntime(time1);
+            request.set_strquerymd5(gstrqueryMD5);
+            request.set_strctrlmd5(gstrctrlMD5);
+            request.set_strvin(gstrVIN);
+            request.set_xdata(strbuf,nbytesize);
+            request.set_kepptime(nkeeptime);
+            request.set_bimportant(bImportant);
+            request.set_nlatency(mnlatency);
+            ffraterate = 1000.0f/((float)mninterval);
+            request.set_fframerate(ffraterate);
+            request.set_nsendtime(time1);
+            nid++;
+
+            nlastsend = xTime.elapsed();
+
+            QTime xt;
+            xt.start();
+            ::grpc::WriteOptions wo;
+//            wo.set_write_through();
+//            wo.clear_buffer_hint();
+    //        writer->Write(request,(void * )2);
+
+    //        bool bsend = true;
+            bool bsend = writer->Write(request,wo);
+
+            *nlastreftime = QDateTime::currentMSecsSinceEpoch();
+
+//            if(xt.elapsed()>10)
+//            {
+//                nok = 0;
+//                if(ninterval < 1000)ninterval = ninterval * 11/10;
+//                mninterval = ninterval;
+//                ffraterate = 1.0f/((float)mninterval);
+//                qDebug("send ela is %d ninterval is %d",xt.elapsed(),ninterval);
+//            }
+//            else
+//            {
+//                nok++;
+//                if((ninterval > nrawinterval)&&(nok>10))
+//                {
+//                    nok = 0;
+
+//                    ninterval = ninterval*10/11;
+//                    mninterval = ninterval;
+//                    ffraterate = 1.0f/((float)mninterval);
+//                    std::cout<<"ninterval is "<<ninterval<<std::endl;
+//                }
+//            }
+            if(bsend == false)std::cout<<"send msg. rtn is "<<bsend<<std::endl;
+
+
+        }
+
+
+    }
+    std::cout<<"thread send end."<<std::endl;
+}
+
+
+
+void grpcclient::threadrecv(std::shared_ptr<::grpc::ClientReaderWriter<iv::UploadRequestStream, iv::UploadReplyStream> > writer, std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime)
+{
+    std::cout<<"threadrecv start"<<std::endl;
+    iv::UploadReplyStream reply;
+    while (writer->Read(&reply)) {
+
+        *nlastreftime = QDateTime::currentMSecsSinceEpoch();
+        qint64 nlaten = QDateTime::currentMSecsSinceEpoch() - reply.nreqsendtime();
+        if(reply.nreqsendtime() == 0)nlaten = 0;
+        else
+        {
+            nlaten = nlaten - reply.npausetime();
+        }
+        calclatency(nlaten);
+
+        if(reply.framerate() >0.001)
+        {
+            mninterval = 1000.0/reply.framerate();
+        }
+
+        qDebug("latency is %ld",nlaten);
+//        nfail = 0;
+//        std::cout << "接收到回复:" << reply.remsg()<<"--\n" << std::endl;
+        if(reply.nres() == 1)
+        {
+            iv::cloud::cloudmsg xmsg;
+            if(xmsg.ParseFromArray(reply.xdata().data(),reply.xdata().size()))
+            {
+                sharectrlmsg(&xmsg);
+            }
+        }
+
+//            std::cout<<"read data from server."<<std::endl;
+    }
+    std::cout<<"threadrecv end."<<std::endl;
+    *pbrun = false;
+}
+
+
+void grpcclient::threadRPC(std::shared_ptr<qint64> pnrpctime,std::shared_ptr<bool> pbRun)
+{
+    std::cout<<"threadrpc start."<<std::endl;
+    std::string target_str = gstrserverip+":";
+    target_str = target_str + gstrserverport ;//std::to_string()
+    auto cargs = grpc::ChannelArguments();
+    cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
+    cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
+
+    std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
+             target_str, grpc::InsecureChannelCredentials(),cargs);
+
+    std::unique_ptr<iv::UploadStream::Stub> stub_ = iv::UploadStream::NewStub(channel);
+
+
+    int nfail = 0;
+
+    while(!QThread::isInterruptionRequested())
+    {
+        ClientContext context ;
+//        std::shared_ptr<ClientContext> pcontext(new ClientContext);
+        std::shared_ptr<::grpc::ClientReaderWriter<iv::UploadRequestStream, iv::UploadReplyStream> > writerRead(stub_->upload(&context));
+
+
+//        std::shared_ptr<bool> pbRun(new bool);
+        *pbRun = true;
+        std::shared_ptr<qint64> pntime = pnrpctime ;
+        *pntime = QDateTime::currentMSecsSinceEpoch();
+        std::thread * pthread = new std::thread(&grpcclient::threadsend,this,writerRead,pbRun,pntime);
+        (void )pthread;
+        std::thread * precvthread = new std::thread(&grpcclient::threadrecv,this,writerRead,pbRun,pntime);
+        (void)precvthread;
+
+        pthread->join();
+        precvthread->join();
+
+ //       std::cout<<"threadRPC end"<<std::endl;
+ //       *pbRun = false;
+
+//        while(((QDateTime::currentMSecsSinceEpoch() - *pntime)<3000)&&(*pbRun))
+//        {
+//            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+//        }
+
+//        *pbRun = false;
+
+
+
+        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+
+        channel = grpc::CreateCustomChannel(
+                 target_str, grpc::InsecureChannelCredentials(),cargs);
+        stub_ = iv::UploadStream::NewStub(channel);
+        nfail++;
+//        if(nfail > 100)std::this_thread::sleep_for(std::chrono::milliseconds(3000));
+//        else std::this_thread::sleep_for(std::chrono::milliseconds(100));
+        std::cout<<"reconnnect to server. nfail is "<<nfail<<std::endl;
+    }
+}
+
+void grpcclient::run()
+{
+            std::shared_ptr<bool> pbRun(new bool);
+            *pbRun = true;
+                    std::shared_ptr<qint64> pntime(new qint64);
+                    *pntime = QDateTime::currentMSecsSinceEpoch();
+
+    std::thread * pthread = new std::thread(&grpcclient::threadRPC,this,pntime,pbRun);
+    return;
+
+//    while(!QThread::isInterruptionRequested())
+//    {
+//        std::shared_ptr<bool> pbRun(new bool);
+//        *pbRun = true;
+//        std::shared_ptr<qint64> pntime(new qint64);
+//        *pntime = QDateTime::currentMSecsSinceEpoch();
+//        std::thread * pthread = new std::thread(&grpcclient::threadRPC,this,pntime,pbRun);
+//        (void )pthread;
+//        while(((QDateTime::currentMSecsSinceEpoch() - *pntime)<10000)&&(*pbRun))
+//        {
+//            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+//        }
+//        std::cout<<" reconect."<<std::endl;
+//        *pbRun = false;
+
+//    }
+
+}
+
+
+void grpcclient::dec_yaml(const char * stryamlpath)
+{
+
+    YAML::Node config;
+    try
+    {
+        config = YAML::LoadFile(stryamlpath);
+    }
+    catch(YAML::BadFile e)
+    {
+        qDebug("load error.");
+        return;
+    }
+
+    std::vector<std::string> vecmodulename;
+
+
+    if(config["server"])
+    {
+        gstrserverip = config["server"].as<std::string>();
+    }
+    if(config["port"])
+    {
+        gstrserverport = config["port"].as<std::string>();
+    }
+    if(config["uploadinterval"])
+    {
+        gstruploadinterval = config["uploadinterval"].as<std::string>();
+    }
+
+    if(config["VIN"])
+    {
+        gstrVIN = config["VIN"].as<std::string>();
+    }
+
+    if(config["queryMD5"])
+    {
+        gstrqueryMD5 = config["queryMD5"].as<std::string>();
+    }
+    else
+    {
+        return;
+    }
+
+    if(config["ctrlMD5"])
+    {
+        gstrctrlMD5 = config["ctrlMD5"].as<std::string>();
+    }
+
+
+    std::string strmsgname;
+
+    if(config["uploadmessage"])
+    {
+
+        for(YAML::const_iterator it= config["uploadmessage"].begin(); it != config["uploadmessage"].end();++it)
+        {
+            std::string strtitle = it->first.as<std::string>();
+            std::cout<<strtitle<<std::endl;
+
+            if(config["uploadmessage"][strtitle]["msgname"]&&config["uploadmessage"][strtitle]["buffersize"]&&config["uploadmessage"][strtitle]["buffercount"])
+            {
+                iv::msgunit xmu;
+                strmsgname = config["uploadmessage"][strtitle]["msgname"].as<std::string>();
+                strncpy(xmu.mstrmsgname,strmsgname.data(),255);
+                xmu.mnBufferSize = config["uploadmessage"][strtitle]["buffersize"].as<int>();
+                xmu.mnBufferCount = config["uploadmessage"][strtitle]["buffercount"].as<int>();
+                if(config["uploadmessage"][strtitle]["bimportant"])
+                {
+                   std::string strimportant =    config["uploadmessage"][strtitle]["bimportant"].as<std::string>();
+                   if(strimportant == "true")
+                   {
+                       xmu.mbImportant = true;
+                   }
+                }
+                if(config["uploadmessage"][strtitle]["keeptime"])
+                {
+                   std::string strkeep =    config["uploadmessage"][strtitle]["keeptime"].as<std::string>();
+                   xmu.mnkeeptime = atoi(strkeep.data());
+                }
+                mvectormsgunit.push_back(xmu);
+            }
+        }
+    }
+    else
+    {
+
+
+    }
+
+    if(!config["ctrlMD5"])
+    {
+       return;
+    }
+
+    if(config["ctrlmessage"])
+    {
+        std::string strnodename = "ctrlmessage";
+        for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it)
+        {
+            std::string strtitle = it->first.as<std::string>();
+            std::cout<<strtitle<<std::endl;
+
+            if(config[strnodename][strtitle]["msgname"]&&config[strnodename][strtitle]["buffersize"]&&config[strnodename][strtitle]["buffercount"])
+            {
+                iv::msgunit xmu;
+                strmsgname = config[strnodename][strtitle]["msgname"].as<std::string>();
+                strncpy(xmu.mstrmsgname,strmsgname.data(),255);
+                xmu.mnBufferSize = config[strnodename][strtitle]["buffersize"].as<int>();
+                xmu.mnBufferCount = config[strnodename][strtitle]["buffercount"].as<int>();
+                mvectorctrlmsgunit.push_back(xmu);
+            }
+        }
+    }
+    else
+    {
+
+    }
+
+    return;
+
+}
+
+void grpcclient::sharectrlmsg(iv::cloud::cloudmsg * pxmsg)
+{
+    int i;
+    int nsize = pxmsg->xclouddata_size();
+    for(i=0;i<nsize;i++)
+    {
+        int j;
+        int nquerysize = mvectorctrlmsgunit.size();
+        for(j=0;j<nquerysize;j++)
+        {
+            if(strncmp(pxmsg->xclouddata(i).msgname().data(), mvectorctrlmsgunit[j].mstrmsgname,255) == 0)
+            {
+ //               qDebug("size is %d ",pxmsg->xclouddata(i).data().size());
+                iv::modulecomm::ModuleSendMsg(mvectorctrlmsgunit[j].mpa,pxmsg->xclouddata(i).data().data(),pxmsg->xclouddata(i).data().size());
+                break;
+            }
+        }
+    }
+}
+
+void grpcclient::UpdateData(const char *strdata, const unsigned int nSize, const char *strmemname)
+{
+    int nsize = mvectormsgunit.size();
+    int i;
+    for(i=0;i<nsize;i++)
+    {
+        if(strncmp(strmemname,mvectormsgunit[i].mstrmsgname,255) == 0)
+        {
+            gMutexMsg.lock();
+            char * strtem = new char[nSize];
+            memcpy(strtem,strdata,nSize);
+            mvectormsgunit[i].mpstrmsgdata.reset(strtem);
+            mvectormsgunit[i].mndatasize = nSize;
+            mvectormsgunit[i].mbRefresh = true;
+            gMutexMsg.unlock();
+            break;
+        }
+    }
+}

+ 97 - 0
src/driver/driver_cloud_swap_client/grpcclient.h

@@ -0,0 +1,97 @@
+#ifndef GRPCCLIENT_H
+#define GRPCCLIENT_H
+
+#include <QThread>
+
+#include <yaml-cpp/yaml.h>
+
+#include <QDateTime>
+
+#include <iostream>
+
+#include <vector>
+
+#include <memory>
+
+#include <QMutex>
+
+#include <thread>
+
+#include "modulecomm.h"
+
+#include "cloud.pb.h"
+
+#include <iostream>
+#include <memory>
+#include <string>
+
+#include <grpcpp/grpcpp.h>
+
+#include "cloudswap.grpc.pb.h"
+
+
+namespace iv {
+struct msgunit
+{
+    char mstrmsgname[256];
+    int mnBufferSize = 10000;
+    int mnBufferCount = 1;
+    void * mpa;
+    std::shared_ptr<char> mpstrmsgdata;
+    int mndatasize = 0;
+    bool mbRefresh = false;
+    bool mbImportant = false;
+    int mnkeeptime = 100;
+};
+}
+
+
+using grpc::Channel;
+using grpc::ClientContext;
+using grpc::Status;
+
+class grpcclient : public QThread
+{
+public:
+    grpcclient(std::string stryamlpath);
+
+private:
+    std::string gstrserverip =  "0.0.0.0";//"123.57.212.138";
+    std::string gstrserverport = "50061";//"9000";
+    std::string gstruploadinterval = "1000";
+    void * gpa;
+    QMutex gMutexMsg;
+    std::thread * guploadthread;
+
+
+    std::vector<iv::msgunit> mvectormsgunit;
+
+    std::vector<iv::msgunit> mvectorctrlmsgunit;
+
+
+    std::string gstrVIN = "AAAAAAAAAAAAAAAAA";
+    std::string gstrqueryMD5 = "5d41402abc4b2a76b9719d911017c591";//"5d41402abc4b2a76b9719d911017c592";
+    std::string gstrctrlMD5 = "5d41402abc4b2a76b9719d911017c591";
+
+
+
+    int gindex = 0;
+
+    int mninterval = 100;
+    int mnlatency = 0;
+    std::vector<qint64> mvectorlatency;
+public:
+    void UpdateData(const char * strdata,const unsigned int nSize,const char * strmemname);
+private:
+    void run();
+    void dec_yaml(const char * stryamlpath);
+    void sharectrlmsg(iv::cloud::cloudmsg * pxmsg);
+
+    void threadRPC(std::shared_ptr<qint64> pnrpctime,std::shared_ptr<bool> pbRun);
+
+    void threadsend(std::shared_ptr<::grpc::ClientReaderWriter<iv::UploadRequestStream, iv::UploadReplyStream> > writer,std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime);
+    void threadrecv(std::shared_ptr<::grpc::ClientReaderWriter<iv::UploadRequestStream, iv::UploadReplyStream> > writer,std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime);
+    qint64 calclatency(qint64 nnewlatency);
+};
+
+#endif // GRPCCLIENT_H

+ 472 - 0
src/driver/driver_cloud_swap_client/main.cpp

@@ -0,0 +1,472 @@
+#include <QCoreApplication>
+
+#include "grpcclient.h"
+
+#include "ivversion.h"
+
+/*
+#include <yaml-cpp/yaml.h>
+
+#include <QDateTime>
+
+#include <iostream>
+
+#include <vector>
+
+#include <memory>
+
+#include <QMutex>
+
+#include <thread>
+
+#include "modulecomm.h"
+
+#include "cloud.pb.h"
+
+#include <iostream>
+#include <memory>
+#include <string>
+
+#include <grpcpp/grpcpp.h>
+
+#include "uploadmsg.grpc.pb.h"
+
+
+using grpc::Channel;
+using grpc::ClientContext;
+using grpc::Status;
+
+
+void test()
+{
+    std::string target_str = "0.0.0.0:50051";
+
+    auto cargs = grpc::ChannelArguments();
+    cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
+    cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
+
+    std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
+             target_str, grpc::InsecureChannelCredentials(),cargs);
+
+    std::unique_ptr<iv::Upload::Stub> stub_ = iv::Upload::NewStub(channel);
+
+
+    iv::UploadRequest request;
+
+
+
+    // Container for the data we expect from the server.
+    iv::UploadReply reply;
+
+    int nid = 0;
+
+    nid = 1;
+
+    while(1)
+    {
+
+
+
+
+        // Context for the client. It could be used to convey extra information to
+        // the server and/or tweak certain RPC behaviors.
+
+
+        ClientContext context ;
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+        qint64 time1 = QDateTime::currentMSecsSinceEpoch();
+
+        request.set_id(nid);
+        request.set_ntime(time1);
+        nid++;
+        // The actual RPC.
+        Status status = stub_->upload(&context, request, &reply);
+        if (status.ok()) {
+            std::cout<<nid<<" upload successfully"<<std::endl;
+//            qint64 time2;
+
+//            memcpy(&time2,reply.data().data(),8);
+//            qint64 time3 = QDateTime::currentMSecsSinceEpoch();
+//            std::cout<<"reply data size is "<<reply.data().size()<<std::endl;
+//            std::cout<<" latency is "<<(time2 - time1)<<" 2 is "<<(time3 - time2)<<std::endl;
+//          return reply.message();
+        } else {
+          std::cout << status.error_code() << ": " << status.error_message()
+                    << std::endl;
+          std::cout<<"RPC failed"<<std::endl;
+          std::this_thread::sleep_for(std::chrono::milliseconds(900));
+
+//          delete pcontext;
+//          pcontext = new ClientContext;
+
+//          channel = grpc::CreateCustomChannel(
+//                   target_str, grpc::InsecureChannelCredentials(),cargs);
+
+//          stub_ = iv::Upload::NewStub(channel);
+        }
+    }
+}
+
+std::string gstrserverip =  "0.0.0.0";//"123.57.212.138";
+std::string gstrserverport = "50051";//"9000";
+std::string gstruploadinterval = "1000";
+void * gpa;
+QMutex gMutexMsg;
+std::thread * guploadthread;
+
+
+
+
+std::vector<iv::msgunit> mvectormsgunit;
+
+std::vector<iv::msgunit> mvectorctrlmsgunit;
+
+
+std::string gstrVIN = "AAAAAAAAAAAAAAAAA";
+std::string gstrqueryMD5 = "5d41402abc4b2a76b9719d911017c591";//"5d41402abc4b2a76b9719d911017c592";
+std::string gstrctrlMD5 = "5d41402abc4b2a76b9719d911017c591";
+
+
+
+int gindex = 0;
+
+void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname)
+{
+
+    int nsize = mvectormsgunit.size();
+    int i;
+    for(i=0;i<nsize;i++)
+    {
+        if(strncmp(strmemname,mvectormsgunit[i].mstrmsgname,255) == 0)
+        {
+            gMutexMsg.lock();
+            char * strtem = new char[nSize];
+            memcpy(strtem,strdata,nSize);
+            mvectormsgunit[i].mpstrmsgdata.reset(strtem);
+            mvectormsgunit[i].mndatasize = nSize;
+            mvectormsgunit[i].mbRefresh = true;
+            gMutexMsg.unlock();
+            break;
+        }
+    }
+}
+
+
+void sharectrlmsg(iv::cloud::cloudmsg * pxmsg)
+{
+    int i;
+    int nsize = pxmsg->xclouddata_size();
+    for(i=0;i<nsize;i++)
+    {
+        int j;
+        int nquerysize = mvectorctrlmsgunit.size();
+        for(j=0;j<nquerysize;j++)
+        {
+            if(strncmp(pxmsg->xclouddata(i).msgname().data(), mvectorctrlmsgunit[j].mstrmsgname,255) == 0)
+            {
+ //               qDebug("size is %d ",pxmsg->xclouddata(i).data().size());
+                iv::modulecomm::ModuleSendMsg(mvectorctrlmsgunit[j].mpa,pxmsg->xclouddata(i).data().data(),pxmsg->xclouddata(i).data().size());
+                break;
+            }
+        }
+    }
+}
+
+
+void threadupload()
+{
+    int nsize = mvectormsgunit.size();
+    int i;
+
+    int ninterval = atoi(gstruploadinterval.data());
+    if(ninterval<=0)ninterval = 100;
+
+    QTime xTime;
+    xTime.start();
+    int nlastsend = xTime.elapsed();
+
+    std::string target_str = gstrserverip+":";
+    target_str = target_str + gstrserverport ;//std::to_string()
+    auto cargs = grpc::ChannelArguments();
+    cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
+    cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
+
+    std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
+             target_str, grpc::InsecureChannelCredentials(),cargs);
+
+    std::unique_ptr<iv::Upload::Stub> stub_ = iv::Upload::NewStub(channel);
+
+
+    iv::UploadRequest request;
+
+    int nid = 0;
+
+    // Container for the data we expect from the server.
+    iv::UploadReply reply;
+
+    gpr_timespec timespec;
+      timespec.tv_sec = 30;//设置阻塞时间为2秒
+      timespec.tv_nsec = 0;
+      timespec.clock_type = GPR_TIMESPAN;
+
+ //   ClientContext context;
+
+
+
+    while(true)
+    {
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        if((xTime.elapsed()-nlastsend)<ninterval)
+        {
+            continue;
+        }
+
+            bool bImportant = false;
+            int nkeeptime = 0;
+            iv::cloud::cloudmsg xmsg;
+            xmsg.set_xtime(QDateTime::currentMSecsSinceEpoch());
+            gMutexMsg.lock();
+            for(i=0;i<nsize;i++)
+            {
+                if(mvectormsgunit[i].mbRefresh)
+                {
+                    mvectormsgunit[i].mbRefresh = false;
+                    if(mvectormsgunit[i].mbImportant)
+                    {
+                        bImportant = true;
+                    }
+                    if(mvectormsgunit[i].mnkeeptime > nkeeptime)
+                    {
+                        nkeeptime = mvectormsgunit[i].mnkeeptime;
+                    }
+                    iv::cloud::cloudunit xcloudunit;
+                    xcloudunit.set_msgname(mvectormsgunit[i].mstrmsgname);
+                    xcloudunit.set_data(mvectormsgunit[i].mpstrmsgdata.get(),mvectormsgunit[i].mndatasize);
+                    iv::cloud::cloudunit * pcu = xmsg.add_xclouddata();
+                    pcu->CopyFrom(xcloudunit);
+                }
+
+            }
+            gMutexMsg.unlock();
+
+            int nbytesize = xmsg.ByteSize();
+            char * strbuf = new char[nbytesize];
+            std::shared_ptr<char> pstrbuf;
+            pstrbuf.reset(strbuf);
+            if(xmsg.SerializeToArray(strbuf,nbytesize))
+            {
+
+                ClientContext context ;
+                context.set_deadline(timespec);
+                qint64 time1 = QDateTime::currentMSecsSinceEpoch();
+
+                request.set_id(nid);
+                request.set_ntime(time1);
+                request.set_strquerymd5(gstrqueryMD5);
+                request.set_strctrlmd5(gstrctrlMD5);
+                request.set_strvin(gstrVIN);
+                request.set_xdata(strbuf,nbytesize);
+                request.set_kepptime(nkeeptime);
+                request.set_bimportant(bImportant);
+                nid++;
+                // The actual RPC.
+                Status status = stub_->upload(&context, request, &reply);
+                if (status.ok()) {
+                    std::cout<<nid<<" upload successfully"<<std::endl;
+                    if(reply.nres() == 1)
+                    {
+                        iv::cloud::cloudmsg xmsg;
+                        if(xmsg.ParseFromArray(reply.xdata().data(),reply.xdata().size()))
+                        {
+                            sharectrlmsg(&xmsg);
+                        }
+                    }
+                } else {
+                  std::cout << status.error_code() << ": " << status.error_message()
+                            << std::endl;
+                  std::cout<<"RPC failed"<<std::endl;
+                  if(status.error_code() == 4)
+                  {
+                      std::cout<<" RPC Exceed Time, Create New stub_"<<std::endl;
+                      channel = grpc::CreateCustomChannel(
+                               target_str, grpc::InsecureChannelCredentials(),cargs);
+
+                      stub_ = iv::Upload::NewStub(channel);
+                  }
+                  std::this_thread::sleep_for(std::chrono::milliseconds(900));
+
+                }
+
+            }
+            nlastsend = xTime.elapsed();
+
+    }
+}
+
+
+void dec_yaml(const char * stryamlpath)
+{
+
+    YAML::Node config;
+    try
+    {
+        config = YAML::LoadFile(stryamlpath);
+    }
+    catch(YAML::BadFile e)
+    {
+        qDebug("load error.");
+        return;
+    }
+
+    std::vector<std::string> vecmodulename;
+
+
+    if(config["server"])
+    {
+        gstrserverip = config["server"].as<std::string>();
+    }
+    if(config["port"])
+    {
+        gstrserverport = config["port"].as<std::string>();
+    }
+    if(config["uploadinterval"])
+    {
+        gstruploadinterval = config["uploadinterval"].as<std::string>();
+    }
+
+    if(config["VIN"])
+    {
+        gstrVIN = config["VIN"].as<std::string>();
+    }
+
+    if(config["queryMD5"])
+    {
+        gstrqueryMD5 = config["queryMD5"].as<std::string>();
+    }
+    else
+    {
+        return;
+    }
+
+    if(config["ctrlMD5"])
+    {
+        gstrctrlMD5 = config["ctrlMD5"].as<std::string>();
+    }
+
+
+    std::string strmsgname;
+
+    if(config["uploadmessage"])
+    {
+
+        for(YAML::const_iterator it= config["uploadmessage"].begin(); it != config["uploadmessage"].end();++it)
+        {
+            std::string strtitle = it->first.as<std::string>();
+            std::cout<<strtitle<<std::endl;
+
+            if(config["uploadmessage"][strtitle]["msgname"]&&config["uploadmessage"][strtitle]["buffersize"]&&config["uploadmessage"][strtitle]["buffercount"])
+            {
+                iv::msgunit xmu;
+                strmsgname = config["uploadmessage"][strtitle]["msgname"].as<std::string>();
+                strncpy(xmu.mstrmsgname,strmsgname.data(),255);
+                xmu.mnBufferSize = config["uploadmessage"][strtitle]["buffersize"].as<int>();
+                xmu.mnBufferCount = config["uploadmessage"][strtitle]["buffercount"].as<int>();
+                if(config["uploadmessage"][strtitle]["bimportant"])
+                {
+                   std::string strimportant =    config["uploadmessage"][strtitle]["bimportant"].as<std::string>();
+                   if(strimportant == "true")
+                   {
+                       xmu.mbImportant = true;
+                   }
+                }
+                if(config["uploadmessage"][strtitle]["keeptime"])
+                {
+                   std::string strkeep =    config["uploadmessage"][strtitle]["keeptime"].as<std::string>();
+                   xmu.mnkeeptime = atoi(strkeep.data());
+                }
+                mvectormsgunit.push_back(xmu);
+            }
+        }
+    }
+    else
+    {
+
+
+    }
+
+    if(!config["ctrlMD5"])
+    {
+       return;
+    }
+
+    if(config["ctrlmessage"])
+    {
+        std::string strnodename = "ctrlmessage";
+        for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it)
+        {
+            std::string strtitle = it->first.as<std::string>();
+            std::cout<<strtitle<<std::endl;
+
+            if(config[strnodename][strtitle]["msgname"]&&config[strnodename][strtitle]["buffersize"]&&config[strnodename][strtitle]["buffercount"])
+            {
+                iv::msgunit xmu;
+                strmsgname = config[strnodename][strtitle]["msgname"].as<std::string>();
+                strncpy(xmu.mstrmsgname,strmsgname.data(),255);
+                xmu.mnBufferSize = config[strnodename][strtitle]["buffersize"].as<int>();
+                xmu.mnBufferCount = config[strnodename][strtitle]["buffercount"].as<int>();
+                mvectorctrlmsgunit.push_back(xmu);
+            }
+        }
+    }
+    else
+    {
+
+    }
+
+    return;
+
+}
+
+
+*/
+
+int main(int argc, char *argv[])
+{
+    showversion("driver_cloud_grpc_client");
+    QCoreApplication a(argc, argv);
+
+ //   std::thread * ptest = new std::thread(test);
+
+ //   return a.exec();
+
+    char stryamlpath[256];
+    if(argc<2)
+    {
+        snprintf(stryamlpath,255,"driver_cloud_grpc_client_stream.yaml");
+//        strncpy(stryamlpath,abs_ymlpath,255);
+    }
+    else
+    {
+        strncpy(stryamlpath,argv[1],255);
+    }
+//    dec_yaml(stryamlpath);
+
+    grpcclient * pgrpcclient = new grpcclient(stryamlpath);
+    pgrpcclient->start();
+
+//    int i;
+//    for(i=0;i<mvectormsgunit.size();i++)
+//    {
+//        mvectormsgunit[i].mpa = iv::modulecomm::RegisterRecv(mvectormsgunit[i].mstrmsgname,ListenData);
+//    }
+
+//    for(i=0;i<mvectorctrlmsgunit.size();i++)
+//    {
+//        mvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterSend(mvectorctrlmsgunit[i].mstrmsgname,mvectorctrlmsgunit[i].mnBufferSize,
+//                                                                 mvectorctrlmsgunit[i].mnBufferCount);
+//    }
+
+//    guploadthread = new std::thread(threadupload);
+
+    return a.exec();
+}

+ 3 - 0
src/driver/driver_cloud_swap_client/prototocpp.txt

@@ -0,0 +1,3 @@
+protoc -I . --plugin=protoc-gen-grpc=/home/yuchuli/git/grpc-framework/build2/grpc_cpp_plugin --grpc_out=. cloudswap.proto
+
+protoc -I . --cpp_out=. uploadmsg.proto 

+ 21 - 1
src/driver/vtd_rdb/main.cpp

@@ -3,6 +3,8 @@
 #include "rdbconn.h"
 #include <iostream>
 
+#include "xmlparam.h"
+
 #include "rdbmodulecomm.h"
 
 #define DEFAULT_PORT        48190   /* for image port it should be 48192 */
@@ -16,8 +18,26 @@ int main(int argc, char *argv[])
 
     QCoreApplication a(argc, argv);
 
+    std::string strparapath;
+    if(argc<2)
+    {
+        strparapath = "./vtd_rdb.xml";
+    }
+    else
+    {
+        strparapath = argv[1];
+    }
+
+
+    iv::xmlparam::Xmlparam xp(strparapath);
+    std::string strcommonmsgname = xp.GetParam("commonmsgname","rdbcommon");
+    std::string strpicturemsgname = xp.GetParam("picturemsgname","rdbpicture");
+    std::string strvtdip = xp.GetParam("vtdip","10.14.0.234");
+    int ncommonport = xp.GetParam("defaultport",DEFAULT_PORT);
+    int npictureport = xp.GetParam("pictureport",IMAGE_PORT);
+
 
-    rdbmodulecomm * prm = new rdbmodulecomm("rdbcommon","rdbpicture","10.14.0.234",DEFAULT_PORT,"10.14.0.234",IMAGE_PORT);
+    rdbmodulecomm * prm = new rdbmodulecomm(strcommonmsgname,strpicturemsgname,strvtdip,ncommonport,strvtdip,npictureport);
     (void)prm;
 
 

+ 3 - 0
src/driver/vtd_rdb/rdbconn.cpp

@@ -63,6 +63,7 @@ void RDBConn::threadconn(char *strserip, int nport)
 
     while(mbthreadconn)
     {
+        std::cout<<"start connect server."<<std::endl;
         if(bConnect == false)
         {
             if (connect( sClient, (struct sockaddr *)&server, sizeof( server ) ) == -1 )
@@ -122,6 +123,8 @@ void RDBConn::threadconn(char *strserip, int nport)
 
                     // now parse the message
  //                   parseRDBMessage( ( RDB_MSG_t* ) pData, isImage );
+                    parseRDBMessage( ( RDB_MSG_t* ) pData);
+
 
                     // remove message from queue
                     memmove( pData, pData + msgSize, bytesInBuffer - msgSize );

+ 51 - 1
src/driver/vtd_rdb/rdbmodulecomm.cpp

@@ -3,7 +3,7 @@
 rdbmodulecomm::rdbmodulecomm(std::string strcommonmsg,std::string strpicturemsg,std::string strcommonip,int ncommonport,
                              std::string strpictureip,int npictureport)
 {
-    mpacommon = iv::modulecomm::RegisterSend(strcommonmsg.data(),10000,1);
+    mpacommon = iv::modulecomm::RegisterSend(strcommonmsg.data(),10000,100);
     mpapicture = iv::modulecomm::RegisterSend(strpicturemsg.data(),10000000,1);
 
     mpConnCommon = new RDBConn(strcommonip.data(),ncommonport);
@@ -12,6 +12,9 @@ rdbmodulecomm::rdbmodulecomm(std::string strcommonmsg,std::string strpicturemsg,
     mpthreadcommon = new std::thread(&rdbmodulecomm::threadCommon,this,mpConnCommon);
     mpthreadpicture = new std::thread(&rdbmodulecomm::threadPicture,this,mpConnPicture);
 
+    int ninterval = 10;
+    mpthreadCommonSend = new std::thread(&rdbmodulecomm::threadCommonSend,this,ninterval);
+
 }
 
 rdbmodulecomm::~rdbmodulecomm()
@@ -33,6 +36,10 @@ void rdbmodulecomm::threadCommon(RDBConn * pconn)
         iv::rdbitem xrdbitem;
         nrtn = pconn->ConsumeBuf(xrdbitem,100);
         if(nrtn == 0)continue;
+        mmutexcommon.lock();
+        mvectorcommonitem.push_back(xrdbitem);
+        mmutexcommon.unlock();
+        continue;
         std::cout<<"pkg id: "<<xrdbitem.pkgid<<std::endl;
         switch (xrdbitem.pkgid) {
         case RDB_PKG_ID_OBJECT_STATE:
@@ -68,3 +75,46 @@ void rdbmodulecomm::threadPicture(RDBConn *pconn)
         }
     }
 }
+
+void rdbmodulecomm::threadCommonSend(int nmsinterval)
+{
+    int64_t lastsend =  std::chrono::system_clock::now().time_since_epoch().count();
+    if(nmsinterval<1)nmsinterval = 1;
+    while(mbRun)
+    {
+        int64_t timenow =  std::chrono::system_clock::now().time_since_epoch().count();
+        if(abs(timenow - lastsend) < nmsinterval )
+        {
+            std::this_thread::sleep_for(std::chrono::milliseconds(1));
+            continue;
+        }
+        iv::vtd::rdbdata xrdbdata;
+        mmutexcommon.lock();
+        xrdbdata.set_mitemcount(mvectorcommonitem.size());
+        xrdbdata.set_msgtime(timenow);
+        int i;
+        for(i=0;i<(int)mvectorcommonitem.size();i++)
+        {
+            iv::vtd::rdbitem * pitem = xrdbdata.add_mrdbitem();
+            iv::rdbitem * prdbitem = &mvectorcommonitem[i];
+            pitem->set_simframe(prdbitem->simFrame);
+            pitem->set_simtime(prdbitem->simTime);
+            pitem->set_pkgid(prdbitem->pkgid);
+            pitem->set_pkgdata(prdbitem->mpstr_pkgdata.get(),prdbitem->mnpkgdatasize);
+        }
+        mvectorcommonitem.clear();
+        mmutexcommon.unlock();
+
+        int ndatasize = xrdbdata.ByteSize();
+        std::shared_ptr<char> pstrdata_ptr = std::shared_ptr<char>(new char[ndatasize]);
+        if(xrdbdata.SerializeToArray(pstrdata_ptr.get(),ndatasize))
+        {
+            iv::modulecomm::ModuleSendMsg(mpacommon,pstrdata_ptr.get(),ndatasize);
+        }
+        else
+        {
+            std::cout<<"Error serialize xrdbdata."<<std::endl;
+        }
+
+    }
+}

+ 10 - 0
src/driver/vtd_rdb/rdbmodulecomm.h

@@ -2,11 +2,14 @@
 #define RDBMODULECOMM_H
 
 #include <thread>
+#include <mutex>
 
 #include "modulecomm.h"
 #include "rdbconn.h"
 #include <iostream>
 
+#include "vtd.pb.h"
+
 class rdbmodulecomm
 {
 public:
@@ -24,6 +27,9 @@ private:
     RDBConn * mpConnCommon;
     RDBConn * mpConnPicture;
 
+    std::vector<iv::rdbitem> mvectorcommonitem;
+    std::mutex mmutexcommon;
+
 private:
     void * mpacommon;
     void * mpapicture;
@@ -33,10 +39,14 @@ private:
     std::thread * mpthreadcommon;
     std::thread * mpthreadpicture;
 
+    std::thread * mpthreadCommonSend;
+
 private:
     void threadCommon(RDBConn * pconn);
     void threadPicture(RDBConn *pconn);
 
+    void threadCommonSend(int nmsinterval);
+
 };
 
 #endif // RDBMODULECOMM_H

+ 8 - 2
src/driver/vtd_rdb/vtd_rdb.pro

@@ -20,13 +20,15 @@ DEFINES += QT_DEPRECATED_WARNINGS
 SOURCES += main.cpp \
     VTD/RDBHandler.cc \
     rdbconn.cpp \
-    rdbmodulecomm.cpp
+    rdbmodulecomm.cpp \
+    ../../include/msgtype/vtd.pb.cc
 
 HEADERS += \
     VTD/viRDBIcd.h \
     VTD/RDBHandler.hh \
     rdbconn.h \
-    rdbmodulecomm.h
+    rdbmodulecomm.h \
+    ../../include/msgtype/vtd.pb.h
 
 
 INCLUDEPATH += $$PWD/VTD
@@ -36,3 +38,7 @@ INCLUDEPATH += $$PWD/VTD
 !include(../../../include/common.pri ) {
     error( "Couldn't find the common.pri file!" )
 }
+
+!include(../../../include/ivprotobuf.pri ) {
+    error( "Couldn't find the ivprotobuf.pri file!" )
+}

+ 54 - 0
src/include/proto3/cloudswap.proto

@@ -0,0 +1,54 @@
+// Copyright 2015 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "io.grpc.adc.cloudswap";
+option java_outer_classname = "CloudSwapMsgProto";
+option objc_class_prefix = "HLW";
+
+package iv;
+
+// The Upload service definition.
+service CloudSwapStream {
+  // Sends a Upload
+  rpc swap (stream CloudSwapRequestStream) returns (stream CloudSwapReplyStream) {}
+  
+}
+
+// The request message containing the user's name.
+message CloudSwapRequestStream {
+    uint64 nmsgindex = 1;   // index for calculate network latency.
+    uint64 nmsgtime = 2;    //ns from epoch
+    string strnodeid = 3;  //node id
+    string strobjnodeid = 4; //swap object node id
+    bytes xdata = 5;
+    double pingavg = 6;
+    double pingmin = 7;
+    double pingmax = 8;
+    double pingdev = 9;
+}
+
+// The response message containing the greetings
+message CloudSwapReplyStream {
+  int32 nres = 1;  //0 no message 1 have message
+  repeated bytes xdata = 2;
+  double fLatency = 3; //Latecy = node_ping_avg + objnode_ping_avg + data_store_in_server
+  double fLatency_InServer = 4;
+  uint64 nmsgservertime = 5;  //server send this message when
+}
+
+
+