main.cpp 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. #include <QCoreApplication>
  2. #include <iostream>
  3. #include <vector>
  4. #include <iostream>
  5. #include <memory>
  6. #include <string>
  7. #include <grpcpp/grpcpp.h>
  8. #include <grpcpp/health_check_service_interface.h>
  9. #include <grpcpp/ext/proto_server_reflection_plugin.h>
  10. #include "cloudswap.grpc.pb.h"
  11. #include "swapserver.h"
  12. using grpc::Server;
  13. using grpc::ServerBuilder;
  14. using grpc::ServerContext;
  15. using grpc::Status;
  16. #include <thread>
  17. char gstrport[255];
  18. swapserver * gpswapserver;
  19. #include <getopt.h>
  20. void print_useage()
  21. {
  22. std::cout<<" -p --port $port : port . eq. -p 50051"<<std::endl;
  23. std::cout<<" -h --help print help"<<std::endl;
  24. }
  25. int GetOptLong(int argc, char *argv[]) {
  26. int nRtn = 0;
  27. int opt; // getopt_long() 的返回值
  28. // int digit_optind = 0; // 设置短参数类型及是否需要参数
  29. // 如果option_index非空,它指向的变量将记录当前找到参数符合long_opts里的
  30. // 第几个元素的描述,即是long_opts的下标值
  31. int option_index = 0;
  32. // 设置短参数类型及是否需要参数
  33. const char *optstring = "m:r:x:y:o:p:s:h";
  34. static struct option long_options[] = {
  35. {"port", required_argument, NULL, 'p'},
  36. {"help", no_argument, NULL, 'h'},
  37. // {"optarg", optional_argument, NULL, 'o'},
  38. {0, 0, 0, 0} // 添加 {0, 0, 0, 0} 是为了防止输入空值
  39. };
  40. while ( (opt = getopt_long(argc,
  41. argv,
  42. optstring,
  43. long_options,
  44. &option_index)) != -1) {
  45. switch(opt)
  46. {
  47. case 'p':
  48. strncpy(gstrport,optarg,255);
  49. break;
  50. case 'h':
  51. print_useage();
  52. nRtn = 1; //because use -h
  53. break;
  54. default:
  55. break;
  56. }
  57. }
  58. return nRtn;
  59. }
  60. // Logic and data behind the server's behavior.
  61. class CloudSwapServiceImpl final : public iv::CloudSwapStream::Service {
  62. Status swap(ServerContext* context, ::grpc::ServerReaderWriter<iv::CloudSwapReplyStream, iv::CloudSwapRequestStream>* stream) override {
  63. (void)context;
  64. iv::CloudSwapRequestStream request;
  65. bool bCreateThread = false;
  66. std::string strnodeid;
  67. std::string strobjnodeid;
  68. swapunit * pswap = NULL;
  69. while (stream->Read(&request))
  70. {
  71. if(bCreateThread == false)
  72. {
  73. strnodeid = request.strnodeid();
  74. strobjnodeid = request.strobjnodeid();
  75. pswap = gpswapserver->AddSwapUnit(strnodeid,strobjnodeid,stream);
  76. bCreateThread = true;
  77. }
  78. pswap->SetPingAvg(request.pingavg());
  79. int ndatasize = request.xdata().size();
  80. if(request.nunitcount() > 0)
  81. {
  82. ndatasize = request.xdata().size();
  83. std::shared_ptr<char> pdata_ptr = std::shared_ptr<char>(new char[ndatasize]);
  84. memcpy(pdata_ptr.get(),request.xdata().data(),ndatasize);
  85. gpswapserver->broadmsg(strobjnodeid,pdata_ptr,ndatasize,request.pingavg(),request.flatencyinstore());
  86. }
  87. else
  88. {
  89. std::cout<<"receive heartbeat from "<<strnodeid<<std::endl;
  90. }
  91. }
  92. pswap->stopswap(); //Because connection fail.
  93. std::cout<<" no conn"<<std::endl;
  94. std::cout<<"dis connect."<<std::endl;
  95. return Status::OK;
  96. }
  97. };
  98. void RunServer() {
  99. std::string server_address("0.0.0.0:");
  100. server_address = server_address.append(gstrport);
  101. CloudSwapServiceImpl service;
  102. grpc::EnableDefaultHealthCheckService(true);
  103. // grpc::reflection::InitProtoReflectionServerBuilderPlugin();
  104. ServerBuilder builder;
  105. // Listen on the given address without any authentication mechanism.
  106. builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
  107. builder.SetMaxReceiveMessageSize(300000000);
  108. // builder.SetMaxMessageSize(100000000);
  109. // builder.SetMaxSendMessageSize(100000000);
  110. // Register "service" as the instance through which we'll communicate with
  111. // clients. In this case it corresponds to an *synchronous* service.
  112. builder.RegisterService(&service);
  113. // Finally assemble the server.
  114. std::unique_ptr<Server> server(builder.BuildAndStart());
  115. std::cout << "Server listening on " << server_address << std::endl;
  116. // Wait for the server to shutdown. Note that some other thread must be
  117. // responsible for shutting down the server for this call to ever return.
  118. server->Wait();
  119. }
  120. int main(int argc, char *argv[])
  121. {
  122. QCoreApplication a(argc, argv);
  123. snprintf(gstrport,255,"50061");
  124. int nRtn = GetOptLong(argc,argv);
  125. if(nRtn == 1) //show help,so exit.
  126. {
  127. return 0;
  128. }
  129. gpswapserver = new swapserver();
  130. RunServer();
  131. return a.exec();
  132. }