BOOST 线程完全攻略 - 扩展 - 线程消息通讯

时间:2023-03-09 03:20:26
BOOST 线程完全攻略 - 扩展 - 线程消息通讯
  1. // controlled_module_ex.hpp : controlled_module类的扩展
  2. // 增强线程之间消息通讯
  3. // 增加线程安全启动和安全关闭功能
  4. // 增加定时器功能
  5. #pragma once
  6. #include <boost/shared_ptr.hpp>
  7. #include <boost/any.hpp>
  8. #include "controlled_module.hpp"
  9. struct _command
  10. {
  11. typedef boost::shared_ptr<_command> CCmdPtr;
  12. unsigned int nCmd;
  13. boost::any anyParam;
  14. };
  15. struct _wait_command
  16. {
  17. boost::any par;
  18. unsigned int command;
  19. void * event;
  20. boost::shared_ptr<boost::any> resp;
  21. };
  22. class controlled_module_ex;
  23. struct _notify
  24. {
  25. controlled_module_ex * sender;
  26. int id;
  27. boost::any par;
  28. };
  29. #define BM_RESERVE 1000
  30. #define BM_RING_START BM_RESERVE+1
  31. #define BM_RING_STOP BM_RESERVE+2
  32. #define BM_RING_SETTIME  BM_RESERVE+3
  33. #define BM_RING_SETPARENT BM_RESERVE+4
  34. #define BM_RING_CYCLE BM_RESERVE+5
  35. #define BM_RING_PROCESS BM_RESERVE+6
  36. #define BM_RING_PROCESSEND BM_RESERVE+7
  37. #define BM_RING_PROCESSFAIL BM_RESERVE+8
  38. #define BM_TIMER    BM_RESERVE+9
  39. #define BM_COMMAND  BM_RESERVE+10
  40. #define BM_NOTIFY   BM_RESERVE+11
  41. #define BM_USER 9000
  42. class controlled_timer;
  43. class controlled_module_ex: public controlled_module
  44. {
  45. public:
  46. controlled_module_ex()
  47. {
  48. m_safe = false;
  49. }
  50. ~controlled_module_ex()
  51. {
  52. safestop();
  53. }
  54. public:
  55. template<typename T>
  56. bool postmessage(unsigned int nCmd, const boost::shared_ptr<T>& p)
  57. {
  58. if(this==0||!m_safe)return false;
  59. boost::mutex::scoped_lock lock(m_mutex_command);
  60. _command::CCmdPtr cmd(new _command);
  61. cmd->nCmd = nCmd;
  62. cmd->anyParam = p;
  63. m_list_command.push_back(cmd);
  64. return true;
  65. }
  66. boost::any execute(unsigned int command,boost::any par,int timeout=-1)
  67. {
  68. boost::shared_ptr<_wait_command> shared(new _wait_command);
  69. _wait_command & cmd = *shared;
  70. cmd.command = command;
  71. cmd.event = (void *)CreateEvent(0,FALSE,FALSE,0);
  72. cmd.par = par;
  73. cmd.resp = boost::shared_ptr<boost::any>(new boost::any);
  74. if(this->postmessage(BM_COMMAND,shared))
  75. {
  76. DWORD dw = WaitForSingleObject(cmd.event,timeout);
  77. CloseHandle(cmd.event);
  78. if(dw!=WAIT_OBJECT_0)
  79. return boost::any();
  80. else
  81. return *cmd.resp;
  82. }
  83. else
  84. {
  85. CloseHandle(cmd.event);
  86. return boost::any();
  87. }
  88. }
  89. void notify(_notify p)
  90. {
  91. this->postmessage(BM_NOTIFY,p);
  92. }
  93. bool postmessage(unsigned int nCmd,boost::any p)
  94. {
  95. if(this==0||!m_safe)
  96. return false;
  97. boost::mutex::scoped_lock lock(m_mutex_command);
  98. _command::CCmdPtr cmd(new _command);
  99. cmd->nCmd = nCmd;
  100. cmd->anyParam = p;
  101. m_list_command.push_back(cmd);
  102. return true;
  103. }
  104. bool postmessage(unsigned int nCmd)
  105. {
  106. if(this==0||!m_safe)
  107. return false;
  108. boost::mutex::scoped_lock lock(m_mutex_command);
  109. _command::CCmdPtr cmd(new _command);
  110. cmd->nCmd = nCmd;
  111. cmd->anyParam = 0;
  112. m_list_command.push_back(cmd);
  113. return true;
  114. }
  115. virtual bool work()
  116. {
  117. if(!getmessage())
  118. return false;
  119. else
  120. {
  121. Sleep(this->m_sleeptime);
  122. return true;
  123. }
  124. }
  125. virtual void message(const _command & cmd)
  126. {
  127. if(cmd.nCmd==BM_RING_START)
  128. {
  129. this->on_safestart();
  130. }
  131. else if(cmd.nCmd==BM_RING_STOP)
  132. {
  133. this->on_safestop();
  134. }
  135. else if(cmd.nCmd==BM_TIMER)
  136. {
  137. this->on_timer(boost::any_cast<controlled_timer*>(cmd.anyParam));
  138. }
  139. else if(cmd.nCmd==BM_COMMAND)
  140. {
  141. boost::shared_ptr<_wait_command> shared = boost::any_cast< boost::shared_ptr<_wait_command> >(cmd.anyParam);
  142. _wait_command & cmd = *shared;
  143. *cmd.resp = this->on_command(cmd.command,cmd.par);
  144. SetEvent((HANDLE)cmd.event);
  145. }
  146. else if(cmd.nCmd==BM_NOTIFY)
  147. {
  148. try
  149. {
  150. _notify par = boost::any_cast<_notify>(cmd.anyParam);
  151. this->on_notify(par);
  152. }
  153. catch(boost::bad_any_cast)
  154. {
  155. }
  156. }
  157. }
  158. virtual void release()
  159. {
  160. boost::mutex::scoped_lock lock(m_mutex_command);
  161. m_list_command.clear();
  162. }
  163. void safestart()
  164. {
  165. if(!islive())
  166. start();
  167. m_safe = true;
  168. m_safestart_event = (void*)CreateEvent(NULL,FALSE,FALSE,0);
  169. postmessage(BM_RING_START);
  170. ::WaitForSingleObject((HANDLE)m_safestart_event,INFINITE);
  171. CloseHandle(m_safestart_event);
  172. }
  173. void safestop()
  174. {
  175. if(this->islive())
  176. {
  177. m_safe = false;
  178. m_safestop_event = (void*)CreateEvent(NULL,FALSE,FALSE,0);
  179. {
  180. boost::mutex::scoped_lock lock(m_mutex_command);
  181. _command::CCmdPtr cmd(new _command);
  182. cmd->nCmd = BM_RING_STOP;
  183. cmd->anyParam = 0;
  184. m_list_command.push_back(cmd);
  185. }
  186. DWORD dw = ::WaitForSingleObject((HANDLE)m_safestop_event,3*1000);
  187. if(WAIT_OBJECT_0!=dw)
  188. {
  189. }
  190. CloseHandle(m_safestop_event);
  191. stop();
  192. }
  193. }
  194. virtual void on_timer(const controlled_timer * p){}
  195. virtual void on_safestart()
  196. {
  197. SetEvent(m_safestart_event);
  198. }
  199. virtual void on_safestop()
  200. {
  201. SetEvent(m_safestop_event);
  202. }
  203. virtual void on_notify(const _notify & p)
  204. {
  205. }
  206. protected:
  207. virtual boost::any on_command(const unsigned int command,const boost::any par)
  208. {
  209. return boost::any();
  210. }
  211. bool getmessage()
  212. {
  213. std::list<_command::CCmdPtr> cache;
  214. {
  215. boost::mutex::scoped_lock lock(m_mutex_command);
  216. while(!m_list_command.empty())
  217. {
  218. _command::CCmdPtr p = m_list_command.front();
  219. m_list_command.pop_front();
  220. cache.push_back(p);
  221. }
  222. }
  223. _command::CCmdPtr stop_command;
  224. std::list<_command::CCmdPtr>::iterator item;
  225. for(item = cache.begin();item!=cache.end();item++)
  226. {
  227. if((*(*item)).nCmd==BM_RING_STOP)
  228. {
  229. stop_command = *item;
  230. break;
  231. }
  232. }
  233. if(stop_command.get()==0)
  234. {
  235. while(!cache.empty())
  236. {
  237. _command::CCmdPtr p = cache.front();
  238. cache.pop_front();
  239. try
  240. {
  241. if((*p).nCmd!=BM_RING_START)
  242. {
  243. if(!this->m_safe)
  244. continue;
  245. }
  246. this->message(*p);
  247. }
  248. catch(boost::bad_any_cast &)
  249. {
  250. }
  251. }
  252. return true;
  253. }
  254. else
  255. {
  256. cache.clear();
  257. this->message(*stop_command);
  258. return false;
  259. }
  260. }
  261. private:
  262. void*   m_safestart_event;
  263. void* m_safestop_event;
  264. bool m_safe;//在多线程,尤其牵涉到线程之间有类似socket级别关联时,当父线程safestop以后有可能会收到其他线程的postmessage,这时会引起线程死锁,这个m_safe就是解决这个问题的,当safestop以后不再接收新消息处理
  265. boost::mutex m_mutex_command;
  266. std::list<_command::CCmdPtr> m_list_command;
  267. };
  268. class controlled_timer: public controlled_module_ex
  269. {
  270. public:
  271. controlled_timer()
  272. {
  273. this->m_time = 0;
  274. this->m_parent = 0;
  275. this->m_step = 0;
  276. }
  277. ~controlled_timer(){
  278. }
  279. protected:
  280. controlled_module_ex* m_parent;
  281. int m_time;
  282. int m_step;
  283. public:
  284. void starttimer(int time,controlled_module_ex* parent)
  285. {
  286. this->safestart();
  287. this->postmessage(BM_RING_SETPARENT,parent);
  288. this->postmessage(BM_RING_SETTIME,time);
  289. }
  290. void stoptimer()
  291. {
  292. this->safestop();
  293. }
  294. public:
  295. virtual void on_safestop()
  296. {
  297. m_time = 0;
  298. controlled_module_ex::on_safestop();
  299. }
  300. virtual void message(const _command & cmd)
  301. {
  302. controlled_module_ex::message(cmd);
  303. if(cmd.nCmd==BM_RING_SETTIME)
  304. {
  305. int time = boost::any_cast<int>(cmd.anyParam);
  306. this->m_time = time/this->m_sleeptime;
  307. this->postmessage(BM_RING_CYCLE);
  308. }
  309. else if(cmd.nCmd==BM_RING_SETPARENT)
  310. {
  311. this->m_parent  = boost::any_cast<controlled_module_ex*>(cmd.anyParam);
  312. }
  313. else if(cmd.nCmd==BM_RING_CYCLE)
  314. {
  315. if(m_time>0)
  316. {
  317. if(m_step>m_time)
  318. {
  319. m_parent->postmessage(BM_TIMER,this);
  320. m_step=0;
  321. }
  322. m_step++;
  323. }
  324. this->postmessage(BM_RING_CYCLE);
  325. }
  326. }
  327. };

1.向线程PostMessage

  函数controlled_module_ex::postmessage完成消息推送。
  虚拟函数controlled_module_ex::message(const _command & cmd)实现消息接收。
  1. #include "controlled_module_ex.hpp"
  2. class thdex: public controlled_module_ex
  3. {
  4. protected:
  5. virtual void message(const _command & cmd)
  6. {
  7. controlled_module_ex::message(cmd);
  8. if(cmd.nCmd==BM_USER+1)
  9. {
  10. cout << "get message" << endl;
  11. }
  12. }
  13. };
  14. int _tmain(int argc, _TCHAR* argv[])
  15. {
  16. thdex t;
  17. t.safestart();
  18. t.postmessage(BM_USER+1);
  19. char buf[10];
  20. gets_s(buf,sizeof buf);
  21. t.safestop();
  22. return 0;
  23. }
  2.向线程PostMessage,并携带简单对象参数
 我们都知道常规的PostMessage要传参,如果是整型参数,还可以用强制转换,但如果是其他类型,例如字符串,我们就必须创建一个字符串缓冲,把缓冲指针作为参数传过去,线程还不能忘记删除,否则导致内存泄漏,自定义结构也是一样的操作,如果想尝试传递一个CString对象,是不可能的。
  幸运的是boost提供了boost::any来抽象任何对象类型,controlled_module_ex的消息传递都是基于boost::any来完成,程序员可以由此写出干净而且内存安全的代码。
  1. #include "controlled_module_ex.hpp"
  2. struct mystruct
  3. {
  4. string a;
  5. int b;
  6. };
  7. class thdex: public controlled_module_ex
  8. {
  9. protected:
  10. virtual void message(const _command & cmd)
  11. {
  12. controlled_module_ex::message(cmd);
  13. if(cmd.nCmd==BM_USER+1)
  14. {
  15. cout << "get integer:" << boost::any_cast<int>(cmd.anyParam) << endl;
  16. }
  17. if(cmd.nCmd==BM_USER+2)
  18. {
  19. cout << "get string:" << boost::any_cast<string>(cmd.anyParam) << endl;
  20. }
  21. if(cmd.nCmd==BM_USER+3)
  22. {
  23. mystruct par = boost::any_cast<mystruct>(cmd.anyParam);
  24. cout << "get mystruct:" << par.a << "," << par.b << endl;
  25. }
  26. }
  27. };
  28. int _tmain(int argc, _TCHAR* argv[])
  29. {
  30. thdex t;
  31. t.safestart();
  32. t.postmessage(BM_USER+1,123);
  33. t.postmessage(BM_USER+2,string("hello world"));
  34. mystruct par;
  35. par.a = "hello world";
  36. par.b = 123;
  37. t.postmessage(BM_USER+3,par);
  38. char buf[10];
  39. gets_s(buf,sizeof buf);
  40. t.safestop();
  41. return 0;
  42. }

3.向线程PostMessage,并传递内存块参数

  假如我们书写了一个录音子线程,如何将录制的语音数据传递给其他线程呢,常规做法是创建一个缓冲,将语音数据填充进去,然后将缓冲地址作为参数传递,这种做法要求接收线程不能忘记删除,否则会导致内存泄漏。
  幸运的是boost提供了智能指针,可以类似java,c#的智能内存回收一样来管理内存分配,我们如果使用这个对象来作为参数传递,就可以完美的防范内存泄漏行为,就算子线程没有处理,别担心,内存它会自动回收的。
  1. #include "controlled_module_ex.hpp"
  2. struct mystruct
  3. {
  4. boost::shared_ptr<char> data;
  5. int datalen;
  6. };
  7. class thdex: public controlled_module_ex
  8. {
  9. protected:
  10. virtual void message(const _command & cmd)
  11. {
  12. controlled_module_ex::message(cmd);
  13. if(cmd.nCmd==BM_USER+1)
  14. {
  15. cout << "get sharedptr" << endl; //仅仅得到数据,得不到数据长度
  16. }
  17. if(cmd.nCmd==BM_USER+2)
  18. {
  19. mystruct par = boost::any_cast<mystruct>(cmd.anyParam);
  20. cout << "get sharedptr len:" << par.datalen << endl;
  21. }
  22. }
  23. };
  24. int _tmain(int argc, _TCHAR* argv[])
  25. {
  26. thdex t;
  27. t.safestart();
  28. t.postmessage(BM_USER+1,boost::shared_ptr<char>(new char[1000]));
  29. mystruct par;
  30. par.datalen = 1000;
  31. par.data = boost::shared_ptr<char>(new char[par.datalen]);
  32. t.postmessage(BM_USER+2,par);
  33. char buf[10];
  34. gets_s(buf,sizeof buf);
  35. t.safestop();
  36. return 0;
  37. }

3.向线程SendMessage

  函数controlled_module_ex::execute完成这个工作
  虚拟函数controlled_module_ex::on_command(const unsigned int command,const boost::any par)响应消息处理
  1. #include "controlled_module_ex.hpp"
  2. class thdex: public controlled_module_ex
  3. {
  4. protected:
  5. boost::any on_command(const unsigned int command,const boost::any par)
  6. {
  7. if(command==1)
  8. {
  9. cout << "on command" << endl;
  10. return 0;
  11. }
  12. if(command==2)
  13. {
  14. cout << "on command,par:" << boost::any_cast<string>(par) << endl;
  15. return 0;
  16. }
  17. if(command==3)
  18. {
  19. return true;
  20. }
  21. else
  22. return controlled_module_ex::on_command(command,par);
  23. }
  24. };
  25. int _tmain(int argc, _TCHAR* argv[])
  26. {
  27. thdex t;
  28. t.safestart();
  29. t.execute(1,0);//等待子线程处理完成
  30. t.execute(2,string("hello world"));//带参数 等待子线程完成
  31. bool rs = boost::any_cast<bool>(t.execute(3,0));//等待子线程处理完成,并取得返回值
  32. cout << "get thread result:" << rs << endl;
  33. boost::any timeout = t.execute(4,0,1000);//等待子线程处理,超时1秒
  34. if(timeout.empty())
  35. cout << "timeout " << endl;
  36. char buf[10];
  37. gets_s(buf,sizeof buf);
  38. t.safestop();
  39. return 0;
  40. }

4.定时器

  类似于CWnd::OnTimer,controlled_module_ex也提供一个虚拟函数virtual void on_timer(const controlled_timer * p);来处理定时
  1. #include "controlled_module_ex.hpp"
  2. class thdex: public controlled_module_ex
  3. {
  4. protected:
  5. virtual void on_timer(const controlled_timer *p)
  6. {
  7. cout << "ontimer" << endl;
  8. }
  9. };
  10. int _tmain(int argc, _TCHAR* argv[])
  11. {
  12. thdex t;
  13. controlled_timer timer;
  14. t.safestart();
  15. timer.starttimer(1000,&t);
  16. char buf[10];
  17. gets_s(buf,sizeof buf);
  18. t.safestop();
  19. return 0;
  20. }