procsm.cpp 15 KB


  1. #include <iostream>
  2. #include <thread>
  3. #include <QTime>
  4. #include <QThread>
  5. #include <algorithm>
  6. #include "procsm.h"
  7. #ifdef Q_OS_LINUX
  8. #include <unistd.h>
  9. #endif
  10. class AttachThread : public QThread
  11. {
  12. public:
  13. AttachThread(QSharedMemory * pa,bool & bAttach)
  14. {
  15. mbAttach = bAttach;
  16. mpa = pa;
  17. mbrun = true;
  18. }
  19. QSharedMemory * mpa;
  20. bool mbAttach = false;
  21. bool mbrun = true;
  22. void run()
  23. {
  24. mbAttach = mpa->attach();
  25. mbrun = false;
  26. }
  27. };
  28. procsm::procsm(const char * strsmname,const unsigned int nBufSize,const unsigned int nMaxPacCount,const int nMode)
  29. {
  30. // mnBufSize = nBufSize;
  31. // qDebug("create dbus");
  32. strncpy(mstrsmname,strsmname,256);
  33. mpASMPtr = new QSharedMemory(strsmname);
  34. char strasmname[256];
  35. if(nMode == ModeWrite)
  36. {
  37. bool bres = mpASMPtr->attach();
  38. if(bres == false)
  39. {
  40. mpASMPtr->create(sizeof(ASM_PTR));
  41. }
  42. ASM_PTR * pasm = (ASM_PTR *)mpASMPtr->data();
  43. if(pasm == NULL)
  44. {
  45. qDebug("ASM_PTR is NULL.");
  46. return;
  47. }
  48. snprintf(strasmname,256,"%s_%lld",strsmname,QDateTime::currentMSecsSinceEpoch());
  49. pasm->mnshmsize = sizeof(procsm_info)+nMaxPacCount*sizeof(procsm_head) + nBufSize;
  50. pasm->mnUpdateTime = QDateTime::currentMSecsSinceEpoch();
  51. strncpy(pasm->mstrshmname,strasmname,256);
  52. mASM_State = *pasm;
  53. }
  54. else
  55. {
  56. return;
  57. }
  58. // mpASM = new QSharedMemory(strsmname);
  59. mpASM = new QSharedMemory(strasmname);
  60. if(nMode == ModeWrite)
  61. {
  62. strncpy(mmodulemsg_type.mstrmsgidname,strsmname,255);
  63. mmodulemsg_type.mnBufSize = nBufSize;
  64. mmodulemsg_type.mnMsgBufCount = nMaxPacCount;
  65. strncpy(mmodulemsg_type.mstrmsgname,strasmname,255);
  66. #ifdef Q_OS_LINUX
  67. mmodulemsg_type.mnPID = getpid();
  68. #endif
  69. #ifdef USEDBUS
  70. mmsg = QDBusMessage::createSignal("/catarc/adc", "adc.adciv.modulecomm", strsmname);
  71. mmsg<<1;
  72. #endif
  73. bool bAttach = mpASM->attach();
  74. // AttachThread AT(mpASM,bAttach);
  75. // AT.start();
  76. // QTime xTime;
  77. // xTime.start();
  78. // while(xTime.elapsed()<100)
  79. // {
  80. // if(AT.mbrun == false)
  81. // {
  82. // bAttach = AT.mbAttach;
  83. // break;
  84. // }
  85. // }
  86. // // qDebug("time is %d",xTime.elapsed());
  87. // if(xTime.elapsed()>= 1000)
  88. // {
  89. // qDebug("in 1000ms Attach fail.terminate it .");
  90. // AT.terminate();
  91. // bAttach = false;
  92. // }
  93. // if(!mpASM->attach())
  94. if(!bAttach)
  95. {
  96. mpASM->create(sizeof(procsm_info)+nMaxPacCount*sizeof(procsm_head) + nBufSize);
  97. char * p = (char *)mpASM->data();
  98. if(p == NULL)
  99. {
  100. qDebug("Create SharedMemory Fail.");
  101. return;
  102. }
  103. mpinfo = (procsm_info *)p;
  104. mphead = (procsm_head *)(p+sizeof(procsm_info));
  105. mpinfo->mCap = nMaxPacCount;
  106. mpinfo->mnBufSize = nBufSize;
  107. mpinfo->mFirst = 0;
  108. mpinfo->mNext = 0;
  109. mpinfo->mLock = 0;
  110. }
  111. if(mpASM->isAttached())
  112. {
  113. mbAttach = true;
  114. char * p = (char *)mpASM->data();
  115. mpinfo = (procsm_info *)p;
  116. mphead = (procsm_head *)(p+sizeof(procsm_info));
  117. mnMaxPacCount = mpinfo->mCap;
  118. mnBufSize = mpinfo->mnBufSize;
  119. // qDebug("attach successful");
  120. mstrtem = new char[mnBufSize];
  121. #ifdef USEDBUS
  122. mmsgres = QDBusMessage::createSignal("/catarc/adc", "adciv.interface", "modulemsgres");
  123. mmsgres<<1;
  124. bool bconnect = QDBusConnection::sessionBus().connect(QString(),"/catarc/adc", "adciv.interface", "modulemsgquery",this,SLOT(onQuery()));
  125. if(bconnect == false)
  126. {
  127. std::cout<<"procsm_if_readthread::procsm_if_readthread bconect is false"<<std::endl;
  128. }
  129. #endif
  130. }
  131. else
  132. {
  133. mbAttach = false;
  134. qDebug("Share Memory Error.");
  135. }
  136. }
  137. }
  138. void procsm::recreateasm(int nbufsize)
  139. {
  140. mpASMPtr->lock();
  141. qDebug("recreate asms");
  142. mnBufSize = std::max(nbufsize*11/10,nbufsize+1000);
  143. // mnBufSize = nbufsize+100;
  144. char strasmname[256];
  145. ASM_PTR * pasm = (ASM_PTR *)mpASMPtr->data();
  146. snprintf(strasmname,256,"%s_%lld",mstrsmname,QDateTime::currentMSecsSinceEpoch());
  147. pasm->mnshmsize = sizeof(procsm_info)+mnMaxPacCount*sizeof(procsm_head) + mnBufSize;
  148. pasm->mnUpdateTime = QDateTime::currentMSecsSinceEpoch();
  149. strncpy(pasm->mstrshmname,strasmname,256);
  150. mASM_State = *pasm;
  151. mmodulemsg_type.mnBufSize = mnBufSize;
  152. mmodulemsg_type.mnMsgBufCount = mnMaxPacCount;
  153. strncpy(mmodulemsg_type.mstrmsgname,mASM_State.mstrshmname,255);
  154. mpASM->lock();
  155. int noldmemsize = mpASM->size();
  156. char * px = new char[mpASM->size()];
  157. memcpy(px,mpASM->data(),noldmemsize);
  158. mpASM->unlock();
  159. mpASM->detach();
  160. qDebug("new asm name is %s,buffer size is %d ",mASM_State.mstrshmname,mnBufSize);
  161. mpASM = new QSharedMemory(mASM_State.mstrshmname);
  162. bool bAttach = false;
  163. AttachThread AT(mpASM,bAttach);
  164. AT.start();
  165. QTime xTime;
  166. xTime.start();
  167. while(xTime.elapsed()<100)
  168. {
  169. if(AT.mbrun == false)
  170. {
  171. bAttach = AT.mbAttach;
  172. break;
  173. }
  174. }
  175. // qDebug("time is %d",xTime.elapsed());
  176. if(xTime.elapsed()>= 1000)
  177. {
  178. qDebug("in 1000ms Attach fail.terminate it .");
  179. AT.terminate();
  180. bAttach = false;
  181. }
  182. // if(!mpASM->attach())
  183. if(!bAttach)
  184. {
  185. mpASM->create(sizeof(procsm_info)+mnMaxPacCount*sizeof(procsm_head) + mnBufSize);
  186. memcpy(mpASM->data(),px,noldmemsize);
  187. char * p = (char *)mpASM->data();
  188. mpinfo = (procsm_info *)p;
  189. mphead = (procsm_head *)(p+sizeof(procsm_info));
  190. mpinfo->mCap = mnMaxPacCount;
  191. mpinfo->mnBufSize = mnBufSize;
  192. // mpinfo->mFirst = nfirst;
  193. // mpinfo->mNext = nnext;
  194. // mpinfo->mLock = 0;
  195. }
  196. if(mpASM->isAttached())
  197. {
  198. mbAttach = true;
  199. char * p = (char *)mpASM->data();
  200. mpinfo = (procsm_info *)p;
  201. mphead = (procsm_head *)(p+sizeof(procsm_info));
  202. mnMaxPacCount = mpinfo->mCap;
  203. mnBufSize = mpinfo->mnBufSize;
  204. // qDebug("attach successful");
  205. // mstrtem = new char[mnBufSize];
  206. }
  207. else
  208. {
  209. mbAttach = false;
  210. qDebug("Share Memory Error.");
  211. }
  212. mpASMPtr->unlock();
  213. delete px;
  214. }
  215. #ifdef USEDBUS
  216. void procsm::onQuery()
  217. {
  218. QByteArray ba;
  219. ba.append((char *)&mmodulemsg_type,sizeof(iv::modulemsg_type));
  220. QList<QVariant> x;
  221. x<<ba;
  222. mmsgres.setArguments(x);
  223. QDBusConnection::sessionBus().send(mmsgres);
  224. }
  225. #endif
  226. bool procsm::AttachMem()
  227. {
  228. if(!mpASMPtr->isAttached())mpASMPtr->attach();
  229. if(mpASMPtr->isAttached())
  230. {
  231. ASM_PTR * pasmptr = (ASM_PTR *)(mpASMPtr->data());
  232. mASM_State = * pasmptr;
  233. if(mpASM != 0)
  234. {
  235. if(mpASM->isAttached())mpASM->detach();
  236. delete mpASM;
  237. }
  238. mpASM = new QSharedMemory(mASM_State.mstrshmname);
  239. mpASM->attach();
  240. if(mpASM->isAttached())
  241. {
  242. mbAttach = true;
  243. char * p = (char *)mpASM->data();
  244. mpinfo = (procsm_info *)p;
  245. mphead = (procsm_head *)(p+sizeof(procsm_info));
  246. mnMaxPacCount = mpinfo->mCap;
  247. mnBufSize = mpinfo->mnBufSize;
  248. return true;
  249. }
  250. else
  251. {
  252. return false;
  253. }
  254. }
  255. else
  256. {
  257. return false;
  258. }
  259. return false;
  260. mpASM->attach();
  261. if(mpASM->isAttached())
  262. {
  263. mbAttach = true;
  264. char * p = (char *)mpASM->data();
  265. mpinfo = (procsm_info *)p;
  266. mphead = (procsm_head *)(p+sizeof(procsm_info));
  267. mnMaxPacCount = mpinfo->mCap;
  268. mnBufSize = mpinfo->mnBufSize;
  269. return true;
  270. }
  271. else
  272. {
  273. return false;
  274. }
  275. }
  276. int procsm::MoveMem(const unsigned int nSize)
  277. {
  278. // qDebug("move mem");
  279. unsigned int nRemove = nSize;
  280. if(nRemove == 0)return -1;
  281. // unsigned int * pIndexFirst = (unsigned int *)mpASM->data();
  282. // unsigned int * pIndexNext = pIndexFirst+1;
  283. // qDebug("first = %d next = %d",*pIndexFirst,*pIndexNext);
  284. // unsigned int * pIndexNext = pIndexFirst;
  285. char * pH,*pD;
  286. pH = (char *)mpASM->data();pH = pH + sizeof(procsm_info);
  287. pD = (char *)mpASM->data();pD = pD + sizeof(procsm_info) + mnMaxPacCount * sizeof(procsm_head);
  288. procsm_head * phh = (procsm_head *)pH;
  289. unsigned int nPac = mpinfo->mNext - mpinfo->mFirst;
  290. if(nRemove >nPac)
  291. {
  292. // qDebug("procsm::MoveMem nRemove > nPac nRemove = %d",nRemove);
  293. nRemove = nPac;
  294. }
  295. if(nRemove == nPac)
  296. {
  297. mpinfo->mFirst = mpinfo->mFirst + (unsigned int)nRemove;
  298. return 0;
  299. }
  300. unsigned int i;
  301. int nDataMove = 0;
  302. for(i=0;i<nRemove;i++)
  303. {
  304. procsm_head * phd = phh+i;
  305. nDataMove = nDataMove + phd->mnLen;
  306. }
  307. unsigned int nDataTotal;
  308. for(i=0;i<(nPac - nRemove);i++)
  309. {
  310. memcpy(phh+i,phh+i+nRemove,sizeof(procsm_head));
  311. (phh+i)->mnPos = (phh+i)->mnPos - nDataMove;
  312. }
  313. nDataTotal = (phh + nPac-nRemove-1)->mnPos + (phh+nPac-nRemove-1)->mnLen;
  314. memcpy(mstrtem,pD+nDataMove,nDataTotal);
  315. memcpy(pD,mstrtem,nDataTotal);
  316. // for(i=0;i<nDataTotal;i++)
  317. // {
  318. // *(pD+i) = *(pD+i+nDataMove);
  319. // }
  320. mpinfo->mFirst = mpinfo->mFirst + (unsigned int)nRemove;
  321. return 0;
  322. }
  323. void procsm::checkasm()
  324. {
  325. mpASMPtr->lock();
  326. ASM_PTR * pASM_PTR = (ASM_PTR * )mpASMPtr->data();
  327. if(pASM_PTR->mnUpdateTime == mASM_State.mnUpdateTime)
  328. {
  329. mpASMPtr->unlock();
  330. return;
  331. }
  332. qDebug("reattch mem.");
  333. mbAttach = false;
  334. AttachMem();
  335. mpASMPtr->unlock();
  336. }
  337. int procsm::writemsg(const char *str, const unsigned int nSize)
  338. {
  339. checkasm();
  340. if(nSize > mnBufSize)
  341. {
  342. if(nSize<1000000000)
  343. {
  344. recreateasm(nSize);
  345. checkasm();
  346. }
  347. else
  348. {
  349. qDebug("procsm::writemsg message size is very big");
  350. return -1;
  351. }
  352. }
  353. if(mbAttach == false)
  354. {
  355. std::cout<<"ShareMemory Attach fail."<<std::endl;
  356. return -1;
  357. }
  358. mpASM->lock();
  359. // unsigned int * pIndexFirst = (unsigned int *)mpASM->data();
  360. // unsigned int * pIndexNext = pIndexFirst+1;
  361. if(mpinfo->mLock == 1)
  362. {
  363. std::cout<<"ShareMemory have lock.Init."<<std::endl;
  364. mpinfo->mLock = 0;
  365. mpinfo->mFirst = 0;
  366. mpinfo->mNext = 0;
  367. }
  368. mpinfo->mLock =1;
  369. WRITEMSG:
  370. char * pH,*pD;
  371. QDateTime dt;
  372. pH = (char *)mpASM->data();pH = pH + sizeof(procsm_info);
  373. pD = (char *)mpASM->data();pD = pD + sizeof(procsm_info) + mnMaxPacCount * sizeof(procsm_head);
  374. procsm_head * phh = (procsm_head *)pH;
  375. unsigned int nPac = mpinfo->mNext - mpinfo->mFirst;
  376. if(nPac>=mnMaxPacCount)
  377. {
  378. unsigned int nRemove = mnMaxPacCount/3;
  379. if(nRemove == 0)nRemove = 1;
  380. MoveMem(nRemove);
  381. goto WRITEMSG;
  382. }
  383. if(nPac == 0)
  384. {
  385. memcpy(pD,str,nSize);
  386. dt = QDateTime::currentDateTime();
  387. // phh->mdt = dt;
  388. phh->SetDate(dt);
  389. // memcpy(&phh->mdt,&dt,sizeof(QDateTime));
  390. // phh->mdt = QDateTime::currentDateTime();
  391. phh->mindex = mpinfo->mNext;
  392. phh->mnPos = 0;
  393. phh->mnLen = nSize;
  394. mpinfo->mNext = mpinfo->mNext+1;
  395. }
  396. else
  397. {
  398. if(((phh+nPac-1)->mnPos+(phh+nPac-1)->mnLen + nSize)>=mnBufSize)
  399. {
  400. unsigned int nRemove = mnMaxPacCount/2;
  401. if(nRemove == 0)nRemove = 1;
  402. MoveMem(nRemove);
  403. goto WRITEMSG;
  404. }
  405. else
  406. {
  407. unsigned int nPos = (phh+nPac-1)->mnPos + (phh+nPac-1)->mnLen;
  408. // qDebug("write pos = %d",nPos);
  409. memcpy(pD+nPos,str,nSize);
  410. dt = QDateTime::currentDateTime();
  411. (phh+nPac)->SetDate(dt);
  412. // memcpy(&(phh+nPac)->mdt,&dt,sizeof(QDateTime));
  413. // (phh+nPac)->mdt = QDateTime::currentDateTime();
  414. (phh+nPac)->mindex = mpinfo->mNext;
  415. (phh+nPac)->mnPos = nPos;
  416. (phh+nPac)->mnLen = nSize;
  417. mpinfo->mNext = mpinfo->mNext+1;
  418. }
  419. }
  420. const unsigned int nTM = 0x6fffffff;
  421. if((mpinfo->mNext >nTM)&&(mpinfo->mFirst>nTM))
  422. {
  423. nPac = mpinfo->mNext - mpinfo->mFirst;
  424. unsigned int i;
  425. for(i=0;i<nPac;i++)
  426. {
  427. (phh+i)->mindex = (phh+i)->mindex-nTM;
  428. }
  429. mpinfo->mFirst = mpinfo->mFirst-nTM;
  430. mpinfo->mNext = mpinfo->mNext - nTM;
  431. }
  432. mpinfo->mLock = 0;
  433. mpASM->unlock();
  434. #ifdef USEDBUS
  435. QDBusConnection::sessionBus().send(mmsg);
  436. #endif
  437. return 0;
  438. }
  439. unsigned int procsm::getcurrentnext()
  440. {
  441. checkasm();
  442. unsigned int nNext;
  443. mpASM->lock();
  444. nNext = mpinfo->mNext;
  445. mpASM->unlock();
  446. return nNext;
  447. }
  448. //if return 0 No Data.
  449. //if return -1 nMaxSize is small
  450. //if retrun -2 index is not in range,call getcurrentnext get position
  451. //if return > 0 readdata
  452. int procsm::readmsg(unsigned int index, char *str, unsigned int nMaxSize,unsigned int * nRead,QDateTime * pdt)
  453. {
  454. checkasm();
  455. if(mbAttach == false)
  456. {
  457. std::cout<<"ShareMemory Attach fail."<<std::endl;
  458. return -1;
  459. }
  460. int nRtn = 0;
  461. mpASM->lock();
  462. if((index< mpinfo->mFirst)||(index > mpinfo->mNext))
  463. {
  464. nRtn = -2;
  465. }
  466. if(nRtn != (-2))
  467. {
  468. if(index == mpinfo->mNext)
  469. {
  470. nRtn = 0;
  471. }
  472. else
  473. {
  474. char * pH,*pD;
  475. // pH = (char *)mpASM->data();pH = pH + 2*sizeof(unsigned int);
  476. // pD = (char *)mpASM->data();pD = pD + 2*sizeof(unsigned int) + mnMaxPacCount * sizeof(procsm_head);
  477. pD = (char *)mpASM->data();pD = pD+ sizeof(procsm_info) + mpinfo->mCap*sizeof(procsm_head);
  478. pH = (char *)mpASM->data();pH = pH+sizeof(procsm_info);
  479. procsm_head * phh = (procsm_head *)pH;
  480. unsigned int nPac = mpinfo->mNext - mpinfo->mFirst;
  481. if(nPac == 0)
  482. {
  483. nRtn = 0;
  484. }
  485. else
  486. {
  487. unsigned int nPos = index - mpinfo->mFirst;
  488. *nRead = (phh+nPos)->mnLen;
  489. if((phh+nPos)->mnLen > nMaxSize)
  490. {
  491. nRtn = -1;
  492. }
  493. else
  494. {
  495. // qDebug("read pos = %d",(phh+nPos)->mnPos);
  496. memcpy(str,pD + (phh+nPos)->mnPos,(phh+nPos)->mnLen);
  497. // qDebug("read pos = %d",(phh+nPos)->mnPos);
  498. nRtn = (phh+nPos)->mnLen;
  499. (phh+nPos)->GetDate(pdt);
  500. // memcpy(pdt,&((phh+nPos)->mdt),sizeof(QDateTime));
  501. }
  502. }
  503. }
  504. }
  505. mpASM->unlock();
  506. return nRtn;
  507. }