modulecomm_impl_shm.cpp 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. #ifdef USE_FASTRTPS
  2. #include "modulecomm_impl_shm.h"
  3. #include <thread>
  4. #include <iostream>
  5. #include <QDateTime>
  6. #include <QMutex>
  7. #include <QFile>
  8. #ifdef Q_OS_LINUX
  9. #include <unistd.h>
  10. #endif
  11. namespace iv {
  12. namespace modulecomm {
  13. static QMutex gmodulecomm_dds_Mutex;
  14. static int createcount = 0;
  15. }
  16. }
  17. void modulecomm_impl_shm::callbackTopic(const char * strdata,const unsigned int nSize,const unsigned int index, QDateTime * dt,const char * strmemname) {
  18. if(mbFunPlus)
  19. {
  20. mFun(strdata,nSize,index,dt,strmemname);
  21. }
  22. else
  23. {
  24. (*mpCall)(strdata,nSize,index,dt,strmemname);
  25. }
  26. }
  27. int modulecomm_impl_shm::GetTempConfPath(char *strpath)
  28. {
  29. char strtmppath[256];
  30. QDateTime dt = QDateTime::currentDateTime();
  31. snprintf(strtmppath,256,"/tmp/adc_modulecomm_conf_%04d%02d%02d%02d%02d.ini",dt.date().year(),
  32. dt.date().month(),dt.date().day(),dt.time().hour(),dt.time().minute());
  33. QFile xFile;
  34. xFile.setFileName(strtmppath);
  35. char strtem[256];
  36. char strdata[10000];
  37. snprintf(strdata,10000,"");
  38. if(!xFile.exists())
  39. {
  40. if(xFile.open(QIODevice::ReadWrite))
  41. {
  42. snprintf(strtem,256,"[common]\n");strncat(strdata,strtem,10000);
  43. snprintf(strtem,256,"DCPSDefaultDiscovery=TheRTPSConfig\n");strncat(strdata,strtem,10000);
  44. #ifdef dds_use_shm
  45. snprintf(strtem,256,"DCPSGlobalTransportConfig=myconfig\n");strncat(strdata,strtem,10000);
  46. snprintf(strtem,256,"[config/myconfig]\n");strncat(strdata,strtem,10000);
  47. snprintf(strtem,256,"transports=share\n");strncat(strdata,strtem,10000);
  48. snprintf(strtem,256,"[transport/share]\n");strncat(strdata,strtem,10000);
  49. snprintf(strtem,256,"transport_type=shmem\n");strncat(strdata,strtem,10000);
  50. snprintf(strtem,256,"pool_size=100000000\n");strncat(strdata,strtem,10000);
  51. #endif
  52. snprintf(strtem,256,"[rtps_discovery/TheRTPSConfig]\n");strncat(strdata,strtem,10000);
  53. snprintf(strtem,256,"ResendPeriod=5\n");strncat(strdata,strtem,10000);
  54. xFile.write(strdata,strnlen(strdata,10000));
  55. xFile.close();
  56. }
  57. }
  58. strncpy(strpath,strtmppath,255);
  59. return 0;
  60. }
  61. modulecomm_impl_shm::modulecomm_impl_shm(const char * strcommname,int ntype )
  62. {
  63. strncpy(mstrtopic,strcommname,255);
  64. iv::modulecomm::gmodulecomm_dds_Mutex.lock();
  65. if(ntype == type_recv)
  66. {
  67. mpSub = new TopicsSubscriber();
  68. mpSub->init(strcommname);
  69. // std::this_thread::sleep_for(std::chrono::milliseconds(10));
  70. mnType = type_recv;
  71. }
  72. else
  73. {
  74. mpPub = new TopicsPublisher();
  75. mpPub->init(strcommname);
  76. // std::this_thread::sleep_for(std::chrono::milliseconds(10));
  77. mnType = type_send;
  78. }
  79. iv::modulecomm::createcount++;
  80. std::cout<<"count is "<<iv::modulecomm::createcount<<std::endl;
  81. iv::modulecomm::gmodulecomm_dds_Mutex.unlock();
  82. strncpy(mmodulemsg_type.mstrmsgidname,strcommname,255);
  83. mmodulemsg_type.mnBufSize = 0;
  84. mmodulemsg_type.mnMsgBufCount = 0;
  85. strncpy(mmodulemsg_type.mstrmsgname,strcommname,255);
  86. #ifdef Q_OS_LINUX
  87. mmodulemsg_type.mnPID = getpid();
  88. #endif
  89. #ifdef USEDBUS
  90. mmsgres = QDBusMessage::createSignal("/catarc/adc", "adciv.interface", "modulemsgres");
  91. mmsgres<<1;
  92. bool bconnect = QDBusConnection::sessionBus().connect(QString(),"/catarc/adc", "adciv.interface", "modulemsgquery",this,SLOT(onQuery()));
  93. if(bconnect == false)
  94. {
  95. std::cout<<"procsm_if_readthread::procsm_if_readthread bconect is false"<<std::endl;
  96. }
  97. #endif
  98. }
  99. int modulecomm_impl_shm::listenmsg(ModuleFun xFun)
  100. {
  101. if(mnType == type_send)
  102. {
  103. std::cout<<"send not listen."<<std::endl;;
  104. return -1;
  105. }
  106. mbFunPlus = true;
  107. mFun = xFun;
  108. ModuleFun topicFunction = std::bind(&modulecomm_impl_shm::callbackTopic,this,std::placeholders::_1,
  109. std::placeholders::_2,
  110. std::placeholders::_3,
  111. std::placeholders::_4,
  112. std::placeholders::_5);
  113. mpSub->setReceivedTopicFunction(topicFunction);
  114. return 0;
  115. }
  116. int modulecomm_impl_shm::listenmsg(SMCallBack pCall)
  117. {
  118. if(mnType == type_send)
  119. {
  120. std::cout<<"send not listen."<<std::endl;
  121. return -1;
  122. }
  123. mbFunPlus = false;
  124. mpCall = pCall;
  125. ModuleFun topicFunction = std::bind(&modulecomm_impl_shm::callbackTopic,this,std::placeholders::_1,
  126. std::placeholders::_2,
  127. std::placeholders::_3,
  128. std::placeholders::_4,
  129. std::placeholders::_5);
  130. mpSub->setReceivedTopicFunction(topicFunction);
  131. return 0;
  132. }
  133. #ifdef USEDBUS
  134. void modulecomm_impl_shm::onQuery()
  135. {
  136. QByteArray ba;
  137. ba.append((char *)&mmodulemsg_type,sizeof(iv::modulemsg_type));
  138. QList<QVariant> x;
  139. x<<ba;
  140. mmsgres.setArguments(x);
  141. QDBusConnection::sessionBus().send(mmsgres);
  142. }
  143. #endif
  144. void modulecomm_impl_shm::writemsg(const char *str, int nlen)
  145. {
  146. if(mnType == type_recv)
  147. {
  148. std::cout<<"recv not send."<<std::endl;
  149. return ;
  150. }
  151. mpPub->senddata(str,nlen);
  152. }
  153. #endif