3 ACE_Reactor 同步框架 网络聊天室

时间:2022-09-09 15:03:20


3 ACE_Reactor 同步框架  网络聊天室



ACE_Reactor 框架


3 ACE_Reactor 同步框架  网络聊天室

3 ACE_Reactor 同步框架  网络聊天室

3 ACE_Reactor 同步框架  网络聊天室

3 ACE_Reactor 同步框架  网络聊天室





3 ACE_Reactor 同步框架  网络聊天室




网络聊天室



3 ACE_Reactor 同步框架  网络聊天室

3 ACE_Reactor 同步框架  网络聊天室

3 ACE_Reactor 同步框架  网络聊天室





项目文件:



chunli@Linux:~/ace/AceChatRoom$ tree
.
├── ChatMain.cpp
├── ChatRoom.cpp
├── ChatRoom.h
├── ParticipantAcceptor.cpp
├── ParticipantAcceptor.h
├── Participant.cpp
├── Participant.h
├── SignalHandler.cpp
└── SignalHandler.h

0 directories, 9 files
chunli@Linux:~/ace/AceChatRoom$



主程序:

chunli@Linux:~/ace/AceChatRoom$ cat ChatMain.cpp #include <ace/Reactor.h>#include "ParticipantAcceptor.h"#include "SignalHandler.h"int main() {    SignalHandler sh;    ParticipantAcceptor acceptor;    ACE_INET_Addr addr(8868);    if (acceptor.open(addr) == -1)        return 1;    return ACE_Reactor::instance()->run_reactor_event_loop();}chunli@Linux:~/ace/AceChatRoom$



ChatRoom类文件

chunli@Linux:~/ace/AceChatRoom$ cat ChatRoom.h #ifndef CHATROOM_H_#define CHATROOM_H_#include <list>#include <ace/Singleton.h>#include <ace/Null_Mutex.h>class Participant;class ChatRoom {public:    void join(Participant* user);    void leave(Participant* user);    void forwardMsg(const char* msg);private:    std::list<Participant*> users;};// 不加锁的方式typedef ACE_Singleton<ChatRoom, ACE_Null_Mutex> Room;#endif /* CHATROOM_H_ */chunli@Linux:~/ace/AceChatRoom$ chunli@Linux:~/ace/AceChatRoom$ cat ChatRoom.cpp #include <cstring>#include <iostream>#include "ChatRoom.h"#include "Participant.h"void ChatRoom::join(Participant* user) {users.push_back(user);}void ChatRoom::leave(Participant* user) {std::list<Participant*>::iterator it = users.begin();for (; it != users.end(); ++it) {if (*it == user) {users.erase(it);break;}}}void ChatRoom::forwardMsg(const char* msg) {std::list<Participant*>::const_iterator it = users.begin();for (; it != users.end(); ++it) {ACE_SOCK_Stream& sock = (*it)->socket();if (sock.send(msg, std::strlen(msg)) == -1)(*it)->handle_close(ACE_INVALID_HANDLE, 0);}}chunli@Linux:~/ace/AceChatRoom$







ParticipantAcceptor类文件

chunli@Linux:~/ace/AceChatRoom$ cat ParticipantAcceptor.h #ifndef PARTICIPANTACCEPTOR_H_#define PARTICIPANTACCEPTOR_H_#include <ace/Reactor.h>#include <ace/Event_Handler.h>#include <ace/SOCK_Acceptor.h>class ParticipantAcceptor: ACE_Event_Handler {public:    ParticipantAcceptor(ACE_Reactor* reactor = ACE_Reactor::instance());    virtual ~ParticipantAcceptor();    int open(const ACE_INET_Addr& addr);    virtual ACE_HANDLE get_handle() const;    virtual int handle_input(ACE_HANDLE h = ACE_INVALID_HANDLE);    virtual int handle_close(ACE_HANDLE h, ACE_Reactor_Mask closeMask);private:    ACE_SOCK_Acceptor acceptor;};#endif /* PARTICIPANTACCEPTOR_H_ */chunli@Linux:~/ace/AceChatRoom$ chunli@Linux:~/ace/AceChatRoom$ chunli@Linux:~/ace/AceChatRoom$ cat ParticipantAcceptor.cpp #include <ace/Log_Msg.h>#include "ParticipantAcceptor.h"#include "ChatRoom.h"#include "Participant.h"ParticipantAcceptor::ParticipantAcceptor(ACE_Reactor* reactor) {    this->reactor(reactor);}ParticipantAcceptor::~ParticipantAcceptor() {    handle_close(ACE_INVALID_HANDLE, 0);}int ParticipantAcceptor::open(const ACE_INET_Addr& addr) {    if (acceptor.open(addr, 0) == -1)        ACE_ERROR_RETURN((LM_ERROR, "%p\n", "acceptor.open"), -1);    return reactor()->register_handler(this, ACE_Event_Handler::ACCEPT_MASK);}ACE_HANDLE ParticipantAcceptor::get_handle() const {    return acceptor.get_handle();}int ParticipantAcceptor::handle_input(ACE_HANDLE h) {    Participant* user = new Participant(reactor());    if (acceptor.accept(user->socket()) == -1)        ACE_ERROR_RETURN((LM_ERROR, "%p\n", "acceptor.accept"), -1);    if (user->open() == -1) {        ACE_ERROR_RETURN((LM_ERROR, "%p\n", "acceptor.accept"), -1);        user->handle_close(ACE_INVALID_HANDLE, 0);    } else {        Room::instance()->join(user);    }    return 0;}int ParticipantAcceptor::handle_close(ACE_HANDLE h, ACE_Reactor_Mask closeMask) {    if (acceptor.get_handle() != ACE_INVALID_HANDLE) {        ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK                | ACE_Event_Handler::DONT_CALL;        reactor()->remove_handler(this, m);        acceptor.close();    }    return 0;}chunli@Linux:~/ace/AceChatRoom$




Participant类文件

chunli@Linux:~/ace/AceChatRoom$ cat Participant.h #ifndef PARTICIPANT_H_#define PARTICIPANT_H_#include <ace/Reactor.h>#include <ace/Event_Handler.h>#include <ace/SOCK_Acceptor.h>class Participant: ACE_Event_Handler {public:    static ACE_Time_Value maxMsgInterval;    Participant(ACE_Reactor* reactor = ACE_Reactor::instance());    int open();    virtual ACE_HANDLE get_handle() const;    virtual int handle_input(ACE_HANDLE h = ACE_INVALID_HANDLE);    virtual int handle_timeout(const ACE_Time_Value& t, const void* = 0);    virtual int handle_close(ACE_HANDLE h, ACE_Reactor_Mask closeMask);    ACE_SOCK_Stream& socket();private:    ACE_Time_Value lastMsgTime;    ACE_SOCK_Stream sock;};#endif /* PARTICIPANT_H_ */chunli@Linux:~/ace/AceChatRoom$ cat Participant.cpp #include <ace/Log_Msg.h>#include <ace/Timer_Queue.h>#include "Participant.h"#include "ChatRoom.h"//ACE_Time_Value Participant::maxMsgInterval = ACE_Time_Value(5);ACE_Time_Value Participant::maxMsgInterval = ACE_Time_Value(20);//20秒没有在聊天室说话的人,就被closeParticipant::Participant(ACE_Reactor* reactor) {    this->reactor(reactor);}int Participant::open() {    lastMsgTime = reactor()->timer_queue()->gettimeofday();    int result =            reactor()->register_handler(this, ACE_Event_Handler::READ_MASK);    if (result != 0)        return result;    result = reactor()->schedule_timer(this, 0, ACE_Time_Value::zero,            maxMsgInterval);    return result;}ACE_HANDLE Participant::get_handle() const {    return sock.get_handle();}int Participant::handle_input(ACE_HANDLE h) {    char buf[512] = "";    ssize_t recvBytes = sock.recv(buf, sizeof(buf));    if (recvBytes <= 0)        ACE_ERROR_RETURN((LM_ERROR, "%p\n", "sock.recv"), -1);    lastMsgTime = reactor()->timer_queue()->gettimeofday();    Room::instance()->forwardMsg(buf);    return 0;}int Participant::handle_timeout(const ACE_Time_Value& t, const void*) {    if (t - lastMsgTime > maxMsgInterval)        reactor()->remove_handler(this, ACE_Event_Handler::READ_MASK);    return 0;}int Participant::handle_close(ACE_HANDLE h, ACE_Reactor_Mask closeMask) {    if (sock.get_handle() != ACE_INVALID_HANDLE) {        ACE_Reactor_Mask m = ACE_Event_Handler::ALL_EVENTS_MASK                | ACE_Event_Handler::DONT_CALL;        reactor()->cancel_timer(this);        reactor()->remove_handler(this, m);        sock.close();        Room::instance()->leave(this);        delete this;    }    return 0;}ACE_SOCK_Stream& Participant::socket() {    return sock;}chunli@Linux:~/ace/AceChatRoom$


SignalHandler类文件

chunli@Linux:~/ace/AceChatRoom$ cat SignalHandler.h #ifndef SIGNALHANDLER_H_#define SIGNALHANDLER_H_#include <ace/Signal.h>#include <ace/Reactor.h>#include <ace/Event_Handler.h>class SignalHandler: ACE_Event_Handler {public:    SignalHandler(ACE_Reactor* reactor = ACE_Reactor::instance());    virtual int handle_signal(int signum, siginfo_t*, ucontext_t *);};#endif /* SIGNALHANDLER_H_ */chunli@Linux:~/ace/AceChatRoom$ cat SignalHandler.cpp #include <ace/Log_Msg.h>#include "SignalHandler.h"#include "ChatRoom.h"SignalHandler::SignalHandler(ACE_Reactor* reactor) {    this->reactor(reactor);    ACE_Sig_Set signals;    signals.fill_set();    this->reactor()->register_handler(signals, this);}int SignalHandler::handle_signal(int signum, siginfo_t*, ucontext_t*) {    switch (signum) {    case SIGINT:        ACE_DEBUG((LM_DEBUG, "signal SIGINT, but not be terminated!\n"));        break;    case SIGUSR1:        ACE_DEBUG((LM_DEBUG, "signal SIGUSR1, broadcast greeting ...\n"));        Room::instance()->forwardMsg("hello every one!\n");        break;    case SIGUSR2:        ACE_DEBUG((LM_DEBUG, "signal SIGUSR2, shutdown chat room ...\n"));        this->reactor()->end_reactor_event_loop();        break;    }    return 0;}chunli@Linux:~/ace/AceChatRoom$


编译运行:

编译运行:chunli@Linux:~/ace/AceChatRoom$ g++ *.cpp -lACE -Wall && ./a.out 3个客户端连接上来:chunli@Linux:~$ nc localhost 886811111111111111111111111111111111111111111111222222222222222222222222333333333333333333333333333333311111111111111111111111111111111111111111111111111111111112222222222222222222222222222222333333333333333333333333333333chunli@Linux:~$ nc localhost 886822222222222222222222222222222222222222222222222233333333333333333333333333333331111111111111111111111111111122222222222222222222222222222222222222222222222222222222222222333333333333333333333333333333chunli@Linux:~$ chunli@Linux:~$ nc localhost 886833333333333333333333333333333333333333333333333333333333333333111111111111111111111111111112222222222222222222222222222222333333333333333333333333333333333333333333333333333333333333chunli@Linux:~$ Ctrl+C杀不死chunli@Linux:~/ace/AceChatRoom$ g++ *.cpp -lACE -Wall && ./a.out ^Csignal SIGINT, but not be terminated!断开SSH终端:看到变成了守护进程chunli@Linux:~$ ps aux | grep a.outchunli     2978  0.0  0.1  22704  4208 ?        S    10:54   0:00 ./a.out杀死服务程序:chunli@Linux:~$ ps aux | grep a.outchunli     2952  0.0  0.0  22704  2308 pts/7    S+   10:50   0:00 ./a.outchunli@Linux:~$ kill -9 2952










本文出自 “李春利” 博客,请务必保留此出处http://990487026.blog.51cto.com/10133282/1889648