123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- #include <QCoreApplication>
- #include <iostream>
- #include <vector>
- #include <iostream>
- #include <memory>
- #include <string>
- #include <grpcpp/grpcpp.h>
- #include <grpcpp/health_check_service_interface.h>
- #include <grpcpp/ext/proto_server_reflection_plugin.h>
- #include "cloudswap.grpc.pb.h"
- #include "swapserver.h"
- using grpc::Server;
- using grpc::ServerBuilder;
- using grpc::ServerContext;
- using grpc::Status;
- #include <thread>
- char gstrport[255];
- swapserver * gpswapserver;
- #include <getopt.h>
- void print_useage()
- {
- std::cout<<" -p --port $port : port . eq. -p 50051"<<std::endl;
- std::cout<<" -h --help print help"<<std::endl;
- }
- int GetOptLong(int argc, char *argv[]) {
- int nRtn = 0;
- int opt; // getopt_long() 的返回值
- // int digit_optind = 0; // 设置短参数类型及是否需要参数
- // 如果option_index非空,它指向的变量将记录当前找到参数符合long_opts里的
- // 第几个元素的描述,即是long_opts的下标值
- int option_index = 0;
- // 设置短参数类型及是否需要参数
- const char *optstring = "m:r:x:y:o:p:s:h";
- static struct option long_options[] = {
- {"port", required_argument, NULL, 'p'},
- {"help", no_argument, NULL, 'h'},
- // {"optarg", optional_argument, NULL, 'o'},
- {0, 0, 0, 0} // 添加 {0, 0, 0, 0} 是为了防止输入空值
- };
- while ( (opt = getopt_long(argc,
- argv,
- optstring,
- long_options,
- &option_index)) != -1) {
- switch(opt)
- {
- case 'p':
- strncpy(gstrport,optarg,255);
- break;
- case 'h':
- print_useage();
- nRtn = 1; //because use -h
- break;
- default:
- break;
- }
- }
- return nRtn;
- }
- // Logic and data behind the server's behavior.
- class CloudSwapServiceImpl final : public iv::CloudSwapStream::Service {
- Status swap(ServerContext* context, ::grpc::ServerReaderWriter<iv::CloudSwapReplyStream, iv::CloudSwapRequestStream>* stream) override {
- (void)context;
- iv::CloudSwapRequestStream request;
- bool bCreateThread = false;
- std::string strnodeid;
- std::string strobjnodeid;
- swapunit * pswap = NULL;
- while (stream->Read(&request))
- {
- if(bCreateThread == false)
- {
- strnodeid = request.strnodeid();
- strobjnodeid = request.strobjnodeid();
- pswap = gpswapserver->AddSwapUnit(strnodeid,strobjnodeid,stream);
- bCreateThread = true;
- }
- pswap->SetPingAvg(request.pingavg());
- int ndatasize = request.xdata().size();
- if(request.nunitcount() > 0)
- {
- ndatasize = request.xdata().size();
- std::shared_ptr<char> pdata_ptr = std::shared_ptr<char>(new char[ndatasize]);
- memcpy(pdata_ptr.get(),request.xdata().data(),ndatasize);
- gpswapserver->broadmsg(strobjnodeid,pdata_ptr,ndatasize,request.pingavg(),request.flatencyinstore());
- }
- else
- {
- std::cout<<"receive heartbeat from "<<strnodeid<<std::endl;
- }
- }
- pswap->stopswap(); //Because connection fail.
- std::cout<<" no conn"<<std::endl;
- std::cout<<"dis connect."<<std::endl;
- return Status::OK;
- }
- };
- void RunServer() {
- std::string server_address("0.0.0.0:");
- server_address = server_address.append(gstrport);
- CloudSwapServiceImpl service;
- grpc::EnableDefaultHealthCheckService(true);
- // grpc::reflection::InitProtoReflectionServerBuilderPlugin();
- ServerBuilder builder;
- // Listen on the given address without any authentication mechanism.
- builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
- builder.SetMaxReceiveMessageSize(300000000);
- // builder.SetMaxMessageSize(100000000);
- // builder.SetMaxSendMessageSize(100000000);
- // Register "service" as the instance through which we'll communicate with
- // clients. In this case it corresponds to an *synchronous* service.
- builder.RegisterService(&service);
- // Finally assemble the server.
- std::unique_ptr<Server> server(builder.BuildAndStart());
- std::cout << "Server listening on " << server_address << std::endl;
- // Wait for the server to shutdown. Note that some other thread must be
- // responsible for shutting down the server for this call to ever return.
- server->Wait();
- }
- int main(int argc, char *argv[])
- {
- QCoreApplication a(argc, argv);
- snprintf(gstrport,255,"50061");
- int nRtn = GetOptLong(argc,argv);
- if(nRtn == 1) //show help,so exit.
- {
- return 0;
- }
- gpswapserver = new swapserver();
- RunServer();
- return a.exec();
- }
|