C++集群聊天服务器 muduo+nginx+redis+mysql数据库连接池 笔记 (下)

时间:2024-02-19 10:27:42

C++集群聊天服务器 网络模块+业务模块+CMake构建项目 笔记 (上)-CSDN博客https://blog.csdn.net/weixin_41987016/article/details/135991635?spm=1001.2014.3001.5501C++集群聊天服务器 数据模块+业务模块+CMake构建项目 笔记 (上)-CSDN博客https://blog.csdn.net/weixin_41987016/article/details/136007616?spm=1001.2014.3001.5501C++集群聊天服务器 nginx+redis安装 笔记 (中)-CSDN博客https://blog.csdn.net/weixin_41987016/article/details/136119985?spm=1001.2014.3001.5501基于C++11的数据库连接池【C++/数据库/多线程/MySQL】_c++ 数据库 句柄 连接池管理-CSDN博客https://blog.csdn.net/weixin_41987016/article/details/135719057?spm=1001.2014.3001.5501MysqlConn.h

#pragma once
#include <mysql/mysql.h>
#include <string>
#include <chrono>
using namespace std;
using namespace std::chrono;
class MysqlConn {
public:
    // 初始化数据库连接
    MysqlConn();
    // 释放数据库连接
    ~MysqlConn();
    // 连接数据库
    bool connect(string user, string passwd, string dbName, string ip, unsigned short port = 3306);
    // 更新数据库: select,update,delete
    bool update(string sql);
    // 查询数据库
    MYSQL_RES* query(string sql);
    // 遍历查询得到的结果集
    bool next();
    // 得到结果集中的字段值
    string value(int index);
    // 事务操作
    bool transaction();
    // 提交事务
    bool commit();
    // 事务回滚
    bool rollback();
    // 刷新起始的空闲时间点
    void refreshAliveTime();
    // 计算连接存活的总时长
    long long getAliveTime();
    // 获取连接
    MYSQL* getConnection();
private:
    void freeResult();
    MYSQL* m_conn = nullptr; // 数据库连接
    MYSQL_RES* m_result = nullptr;
    MYSQL_ROW m_row = nullptr;
    steady_clock::time_point m_aliveTime;
};

ConnPool.h

#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>//条件变量
#include "MysqlConn.h"
using namespace std;
class ConnPool {
public:
    static ConnPool* getConnPool();// 获得单例对象
    ConnPool(const ConnPool& obj) = delete; // 删除拷贝构造函数
    ConnPool& operator=(const ConnPool& obj) = delete; // 删除拷贝赋值运算符重载函数
    shared_ptr<MysqlConn> getConn(); // 从连接池中取出一个连接
    ~ConnPool(); // 析构函数
private:
    ConnPool(); // 构造函数私有化
    bool parseJsonFile(); // 解析json格式文件
    void produceConn(); // 生产数据库连接
    void recycleConn(); // 销毁数据库连接
    void addConn(); // 添加数据库连接
 
    // 连接服务器所需信息
    string m_ip;            // 数据库服务器ip地址
    string m_user;          // 数据库服务器用户名
    string m_dbName;        // 数据库服务器的数据库名
    string m_passwd;        // 数据库服务器密码
    unsigned short m_port;  // 数据库服务器绑定的端口
 
    // 连接池信息
    queue<MysqlConn*> m_connQ;
    unsigned int m_maxSize; // 连接数上限值
    unsigned int m_minSize; // 连接数下限值
    int m_timeout; // 连接超时时长
    int m_maxIdleTime; // 最大的空闲时长
    mutex m_mutexQ; // 独占互斥锁
    condition_variable m_cond; // 条件变量
};

MysqlConn.cpp

#include "MysqlConn.h"
#include <muduo/base/Logging.h> 
// 初始化数据库连接
MysqlConn::MysqlConn() {
    m_conn = mysql_init(nullptr);
    mysql_set_character_set(m_conn, "GBK"); // 设置字符集
}
 
// 释放数据库连接
MysqlConn::~MysqlConn() {
    if (m_conn != nullptr) {
        mysql_close(m_conn);
    }
    freeResult();
}
 
// 连接数据库
bool MysqlConn::connect(string user, string passwd, string dbName, string ip, unsigned short port) {
    MYSQL* ptr = mysql_real_connect(m_conn, ip.c_str(), user.c_str(), passwd.c_str(), dbName.c_str(), port, nullptr, 0);
    return ptr != nullptr;
}
 
// 更新数据库:insert,update,delete
bool MysqlConn::update(string sql) {
    if (mysql_query(m_conn, sql.c_str())) {
        return false;
    }
    return true;
}
 
// 查询数据库
MYSQL_RES* MysqlConn::query(string sql) {
    if(mysql_query(m_conn, sql.c_str())) {
        LOG_INFO << __FILE__ << ":" << __LINE__ << ":"
            << sql <<"查询失败!";   
        return nullptr;
    }
    return mysql_use_result(m_conn);
}
 
// 遍历查询得到的结果集
bool MysqlConn::next() {
    if (m_result != nullptr) {
        m_row = mysql_fetch_row(m_result);
        if (m_row != nullptr) {
            return true;
        }
    }
    return false;
}
 
// 得到结果集中的字段值
string MysqlConn::value(int index) {
    int rowCount = mysql_num_fields(m_result);
    if (index >= rowCount || index < 0) {
        return string();
    }
    char* val = m_row[index];
    unsigned long length = mysql_fetch_lengths(m_result)[index];
    return string(val, length);
}
 
// 事务操作
bool MysqlConn::transaction() {
    return mysql_autocommit(m_conn, false);
}
 
// 提交事务
bool MysqlConn::commit() {
    return mysql_commit(m_conn);
}
 
// 事务回滚
bool MysqlConn::rollback() {
    return mysql_rollback(m_conn);
}
 
// 刷新起始的空闲时间点
void MysqlConn::refreshAliveTime() {
    // 这个时间戳就是某个数据库连接,它起始存活的时间点
    // 这个时间点通过时间类就可以得到了
    m_aliveTime = steady_clock::now();
}
 
// 计算连接存活的总时长
long long MysqlConn::getAliveTime() {
    nanoseconds duration = steady_clock::now() - m_aliveTime;
    milliseconds millsec = duration_cast<milliseconds>(duration);
    return millsec.count();
}

// 获取连接
MYSQL *MysqlConn::getConnection() {
    return m_conn;
}

void MysqlConn::freeResult() {
    if (m_result != nullptr) {
        mysql_free_result(m_result);
        m_result = nullptr;
    }
}

ConnPool.cpp

#include "ConnPool.h"
// #include <json/json.h>
// #include <json.h>
#include "json.hpp"
#include <fstream>
#include <thread>
#include <iostream>
// using namespace Json;
using json = nlohmann::json;
ConnPool* ConnPool::getConnPool() {
    static ConnPool pool;
    return &pool;
}
 
// 从连接池中取出一个连接
shared_ptr<MysqlConn> ConnPool::getConn() {
    unique_lock<mutex> locker(m_mutexQ);
    while (m_connQ.empty()) {
        if (cv_status::timeout == m_cond.wait_for(locker, chrono::milliseconds(m_timeout))) {
            if (m_connQ.empty()) {
                //return nullptr;
                continue;
            }
        }
    }
    shared_ptr<MysqlConn>connptr(m_connQ.front(), [this](MysqlConn* conn) {
        lock_guard<mutex>locker(m_mutexQ); // 自动管理加锁和解锁
        conn->refreshAliveTime();// 更新连接的起始的空闲时间点
        m_connQ.push(conn); // 回收数据库连接,此时它再次处于空闲状态
        });// 智能指针
    m_connQ.pop();
    m_cond.notify_one(); // 本意是唤醒生产者
    return connptr;
}
 
ConnPool::~ConnPool() {
    while (!m_connQ.empty()) {
        MysqlConn* conn = m_connQ.front();
        m_connQ.pop();
        delete conn;
    }
}
 
ConnPool::ConnPool() {
    // 加载配置文件
    if (!parseJsonFile()) {
        std::cout << "加载配置文件失败!!!" << std::endl;
        return;
    }
    for (int i = 0; i < m_minSize; ++i) {
        addConn();
    }
    thread producer(&ConnPool::produceConn, this);// 生产连接
    thread recycler(&ConnPool::recycleConn, this);// 销毁连接
    producer.detach();
    recycler.detach();
}
 
bool ConnPool::parseJsonFile() {
    ifstream ifs;
    ifs.open("/home/heheda/Linux/Chat/configuration/dbconf.json");
    if (!ifs.is_open()) {
        std::cout << "无法打开 dbconf.json 配置文件!";
        return false;
    }
    std::cout << "开始解析 dbconf.json 配置文件..." << std::endl;
    json data; // 创建一个空的JSON对象
    ifs>>data; // 将文件内容加载到JSON对象中
    m_ip = data["ip"];
    m_port = data["port"];
    m_user = data["userName"];
    m_passwd = data["password"];
    m_dbName = data["dbName"];
    m_minSize = data["minSize"];
    m_maxSize = data["maxSize"];
    m_maxIdleTime = data["maxIdleTime"];
    m_timeout = data["timeout"];
    /*
    ifstream ifs("dbconf.json");
    Reader rd;
    Value root;
    rd.parse(ifs, root);
    if (root.isObject()) {
        std::cout << "开始解析配置文件..." << std::endl;
        m_ip = root["ip"].asString();
        m_port = root["port"].asInt();
        m_user = root["userName"].asString();
        m_passwd = root["password"].asString();
        m_dbName = root["dbName"].asString();
        m_minSize = root["minSize"].asInt();
        m_maxSize = root["maxSize"].asInt();
        m_maxIdleTime = root["maxIdleTime"].asInt();
        m_timeout = root["timeout"].asInt();
        return true;  // 解析成功返回true,否则返回false。
    }
    return false;
    */
    return true;
}
 
void ConnPool::produceConn() {
    while (true) {  // 生产者线程不断生产连接,直到连接池达到最大值
        unique_lock<mutex> locker(m_mutexQ);  // 加锁,保证线程安全
        while (m_connQ.size() >= m_minSize) {
            m_cond.wait(locker);  // 等待消费者通知
        }
        addConn(); // 生产连接
        m_cond.notify_all();// 通知消费者(唤醒)
    }
}
 
// 回收数据库连接
void ConnPool::recycleConn() {
    while (true) {
        this_thread::sleep_for(chrono::milliseconds(500));// 每隔半秒钟检测一次
        lock_guard<mutex> locker(m_mutexQ);  // 加锁,保证线程安全
        while (m_connQ.size() > m_minSize) {  // 如果连接池中的连接数大于最小连接数,则回收连接
            MysqlConn* conn = m_connQ.front();  // 取出连接池中的连接
            if (conn->getAliveTime() >= m_maxIdleTime) {
                m_connQ.pop();  // 回收连接
                delete conn;  // 释放连接资源
            }
            else {
                break;  // 如果连接的空闲时间小于最大空闲时间,则跳出循环
            }
        }
    }
}
 
// 添加连接到连接池
void ConnPool::addConn() {
    MysqlConn* conn = new MysqlConn;
    conn->connect(m_user, m_passwd, m_dbName, m_ip, m_port);
    conn->refreshAliveTime();// 记录建立连接的时候的对应的时间戳
    m_connQ.push(conn);
}

dbconf.json

{
    "ip": "127.0.0.1",
    "port": 3306,
    "userName": "root",
    "password": "123456",
    "dbName": "chat",
    "minSize":100,  
    "maxSize":1024,
    "maxIdleTime":5000,
    "timeout":1000
}

执行sql语句: 

create table user(
    id int not null auto_increment primary key,
    name varchar(50) not null unique,
    password varchar(50) not null,
    state enum('online','offline')  default 'offline'
);
  • user.hpp
#ifndef USER_H
#define USER_H

#include <string>
using namespace std;

// 匹配User表的ORM类
class User {
public:
    User(int id=-1, string name="", string password="", string state="offline") {
        m_id = id;
        m_name = name;
        m_password = password;
        m_state = state;
    }
    // 设置相应字段
    void setId(int id) { m_id = id; }
    void setName(string name) { m_name = name; }
    void setPwd(string pwd) { m_password = pwd; }   
    void setState(string state) { m_state = state; }
    
    // 获取相应字段
    int getId() const { return m_id; }
    string getName() const { return m_name; }
    string getPwd() const { return m_password; }
    string getState() const { return m_state; }
private:
    int m_id;            // 用户id
    string m_name;       // 用户名
    string m_password;   // 用户密码
    string m_state;      // 当前登录状态
};
#endif // USER_H

/*
数据层代码框架设计
数据库操作与业务代码进行分离,业务代码处理的都为对象,数据库层操作
具体SQL语句,因此我们定义相应的类,每一个类对应数据库中一张表,将
数据库读出来的字段提交给业务使用。
*/
  • usermodel.hpp
#ifndef USERMODEL_H
#define USERMODEL_H
#include "user.hpp"
#include "ConnPool.h"
// User表的数据操作类:针对表的增删改查
class UserModel {
public:
    // user表的增加方法
    bool insert(ConnPool* pool,User& user); 
    // 根据用户号码查询用户信息
    User query(ConnPool* pool,int id);
    // 更新用户的状态信息
    bool updateState(ConnPool* pool,User user);
    // 重置用户的状态信息
    void resetState(ConnPool* pool);
};

#endif // USERMODEL_H
  • usermodel.cpp
#include "usermodel.hpp"
#include "MysqlConn.h"
#include <iostream>
#include <memory>
// User表的增加方法
bool UserModel::insert(ConnPool* pool,User &user) {
    // 1.组装sql语句
    char sql[1024] = {0};
    std::sprintf(sql,"insert into user(name,password,state) values('%s','%s', '%s')",
         user.getName().c_str(), user.getPwd().c_str(), user.getState().c_str());
    // 2.执行sql语句,进行处理
    shared_ptr<MysqlConn> conn = pool->getConn();
    if(conn->update(sql)) {
        // 获取插入成功的用户数据生成的主键id
        // id为自增键,设置回去user对象添加新生成的用户id
        user.setId(mysql_insert_id(conn->getConnection()));
        return true;
    }
    return false;
}

// 根据用户号码查询用户信息
User UserModel::query(ConnPool* pool,int id) {
    // 1.组装sql语句
    char sql[1024] = {0};
    sprintf(sql,"select * from user where id = %d", id);
    // 2.执行sql语句
    shared_ptr<MysqlConn> conn = pool->getConn();
    // 查询id对应的数据
    MYSQL_RES* res = conn->query(sql);
    if(res != nullptr) { // 查询成功
        MYSQL_ROW row = mysql_fetch_row(res);// 获取行数据
        if(row != nullptr) {
            User user;
            user.setId(atoi(row[0]));
            user.setName(row[1]);
            user.setPwd(row[2]);
            user.setState(row[3]);
            // 释放res动态开辟的资源
            mysql_free_result(res);
            return user;// 返回user对应的信息
        }
    }
    return User(); // 未找到,返回默认的user对象
}

// 更新用户的状态信息
bool UserModel::updateState(ConnPool* pool,User user) {
    // 1.组装sql语句
    char sql[1024] = {0};
    sprintf(sql,"update user set state = '%s' where id = %d",
         user.getState().c_str(), user.getId());
    // 2.执行sql语句
    shared_ptr<MysqlConn> conn = pool->getConn();
    if(conn->update(sql)) {
        return true;
    }
    return false;
}

// 重置用户的状态信息
void UserModel::resetState(ConnPool* pool) {
    // 1.组装sql语句
    char sql[1024] = "update user set state = 'offline' where state = 'online'";
    // 2.执行sql语句,进行相应处理
    shared_ptr<MysqlConn> conn = pool->getConn();
    conn->update(sql);
}

  • 执行sql语句:  
create table friend(
    userid int not null,
    friendid int not null
);
alter table friend 
add constraint pk_friend primary key(userid,friendid);
  • friendmodel.hpp
#ifndef FRIENDMODEL_H
#define FRIENDMODEL_H

#include "user.hpp"
#include "ConnPool.h"
#include <vector>
using namespace std;

// Friend用户表的数据操作类:针对类的增删改查(维护好友信息的操作接口方法)
class FriendModel {
public:
    // 添加好友关系
    void insert(ConnPool* pool,int userid, int friendid);
    // 返回用户好友列表:返回用户好友id,名称,登录状态信息 
    vector<User> query(ConnPool* pool,int userid);
};

#endif // FRIENDMODEL_H
  • friendmodel.cpp
#include "friendmodel.hpp"
// 添加好友关系
void FriendModel::insert(ConnPool* pool,int userid, int friendid) {
    // 1.组装sql语句
    char sql[1024] = {0};
    sprintf(sql, "insert into friend values (%d, %d)", userid, friendid);
    // 2.执行sql语句
    shared_ptr<MysqlConn> conn = pool->getConn();
    conn->update(sql);
}

//返回用户好友列表:返回用户好友id、名称、登录状态信息
vector<User> FriendModel::query(ConnPool* pool,int userid) {
    // 1.组装sql语句
    char sql[1024] = {0};
    // sprintf(sql, "select a.id, a.name, a.state from user a inner join friend b on b.friendid = a.id where b.userid = %d", userid);      
    sprintf(sql, "select a.id, a.name, a.state from user a inner join friend b on b.userid = a.id where b.friendid = %d \
            union (select a.id, a.name, a.state from user a inner join friend b on b.friendid = a.id where b.userid = %d \
            or b.friendid = %d and a.id!=%d)",userid,userid,userid,userid);     
    // 2.发送SQL语句,进行相应处理
    vector<User> vec;
    shared_ptr<MysqlConn> conn = pool->getConn();
    MYSQL_RES * res = conn->query(sql);
    if(res != nullptr) {
        // 把userid用户的所有离线消息放入vec中返回
        MYSQL_ROW row;
        //将userid好友的详细信息返回
        while((row = mysql_fetch_row(res)) != nullptr) {
            User user;
            user.setId(atoi(row[0])); // id
            user.setName(row[1]);     // name
            user.setState(row[2]);    // state
            vec.push_back(user);
        }
        mysql_free_result(res);       // 释放资源
        return vec;
    }
    return vec;
}

// select a.id,a.name,a.state from user a inner join 
// friend b on b.friendid = a.id 
// where b.userid = %d

  • 执行sql语句:   
create table offlinemessage(
    userid int not null primary key,
    message varchar(500) not null
);
  • offlinemessage.hpp
#ifndef OFFLINEMESSAGEMODEL_H
#define OFFLINEMESSAGEMODEL_H
#include <string>
#include <vector>
#include "ConnPool.h"
using namespace std;

// 离线消息表的数据操作类:针对表的增删改查(提供离线消息表的操作接口方法)
class OfflineMsgModel {
public:
    // 存储用户的离线消息
    void insert(ConnPool* pool,int userid, string msg);
    // 删除用户的离线消息
    void remove(ConnPool* pool,int userid);
    // 查询用户的离线消息:离线消息可能有多个
    vector<string> query(ConnPool* pool,int userid);
};

#endif // OFFLINEMESSAGEMODEL_H
  • offlinemessage.cpp
#include "offlinemessagemodel.hpp"
// 存储用户的离线消息
void OfflineMsgModel::insert(ConnPool* pool,int userid, string msg) {
    // 1.组装sql语句
    char sql[1024] = {0};
    sprintf(sql, "insert into offlinemessage values(%d, '%s')", userid, msg.c_str());
    // 2.执行sql语句
    shared_ptr<MysqlConn> conn = pool->getConn();
    conn->update(sql);
}

// 删除用户的离线消息
void OfflineMsgModel::remove(ConnPool* pool,int userid) {
    // 1.组装sql语句
    char sql[1024] = {0};
    sprintf(sql, "delete from offlinemessage where userid = %d", userid);
    // 2.执行sql语句
    shared_ptr<MysqlConn> conn = pool->getConn();
    conn->update(sql);
}

// 查询用户的离线消息:离线消息可能有多个
vector<string> OfflineMsgModel::query(ConnPool* pool,int userid) {
    // 1.组装sql语句
    char sql[1024] = {0};
    sprintf(sql, "select message from offlinemessage where userid = %d", userid);
    // 2.执行sql语句
    vector<string> vec;// 存储离线消息,离线消息可能有多条
    shared_ptr<MysqlConn> conn = pool->getConn();
    MYSQL_RES *res = conn->query(sql);
    if(res != nullptr) {
        // 把userid用户的所有离线消息放入vec中返回
        MYSQL_ROW row;
        while((row = mysql_fetch_row(res)) != nullptr) { //循环查找离线消息
            vec.push_back(row[0]);
        }
        mysql_free_result(res);
        return vec;
    }
    return vec;
}

  •  执行sql语句:  
create table allgroup(
    id int not null auto_increment primary key,
    groupname varchar(50) not null,
    groupdesc varchar(200) default ''
);
  • group.hpp
#ifndef GROUP_H
#define GROUP_H
#include <vector>
#include <string>
using namespace std;
#include "groupuser.hpp"
// User表的ORM类
// Group群组表的映射类:映射表的相应字段
class Group{
public:
    Group(int id=-1,string name="",string desc="") 
        : m_id(id)
        ,m_name(name)
        ,m_desc(desc) {
        
    }

    void setId(int id) { m_id = id; }
    void setName(string name) { m_name = name; }
    void setDesc(string desc) { m_desc = desc; }
    
    int getId() const { return m_id; }
    string getName() const { return m_name; }
    string getDesc() const { return m_desc; }
    vector<GroupUser> &getUsers()  { return m_users; }

private:
    int m_id;                 // 群组id
    string m_name;            // 群组名称
    string m_desc;            // 群组功能描述
    vector<GroupUser> m_users;// 存储组成员
};

#endif // GROUP_H

  • 执行sql语句:    
create table groupuser(
    groupid int not null,
    userid int not null,
    grouprole enum('creator','normal') default 'normal'
);
alter table groupuser
add constraint pk_friend primary key(groupid,userid);
  • groupuser.hpp
#ifndef GROUPUSER_H
#define GROUPUSER_H
#include <string>
#include "user.hpp"
using namespace std;

// 群组用户,多了一个role角色信息,从User类直接继承,复用User的其他信息
// GroupUser群组员表的映射类:映射表的相应字段
class GroupUser : public User {
public:
    void setRole(string role) { m_role = role; }
    string getRole() { return m_role; }
private:
    string m_role;
};

#endif // GROUPUSER_H
  • groupmodel.hpp
#ifndef GROUPMODEL_H
#define GROUPMODEL_H

#include "group.hpp"
#include <string>
#include <vector>
using namespace std;
#include "ConnPool.h"
// 群组表的数据操作类:维护数组信息的操作接口方法
class GroupModel {
public:
    // 创建数组
    bool createGroup(ConnPool* pool,Group &group);
    // 加入群组
    void joinGroup(ConnPool* pool,int userid, int groupid, string role);
    // 查询用户所在群组信息
    vector<Group> queryGroups(ConnPool* pool,int userid);
    // 根据指定的groupid查询群组用户id列表,除userid自己,主要用户群聊业务给群组其他成员群发消息
    vector<int> queryGroupUsers(ConnPool* pool,int userid, int groupid);
};

#endif // GROUPMODEL_H
  • groupmodel.cpp
#include "groupmodel.hpp"
#include <iostream>
// 创建群组
bool GroupModel::createGroup(ConnPool* pool,Group &group) {
    // 1.组装sql语句
    char sql[1024] = {0};
    sprintf(sql,"insert into allgroup(groupname,groupdesc) values('%s','%s')"
          ,group.getName().c_str(),group.getDesc().c_str());
    // 2.执行sql语句
    shared_ptr<MysqlConn> conn = pool->getConn();
    if(conn->update(sql)) {
        // 获取到自增id
        group.setId(mysql_insert_id(conn->getConnection()));
        return true;
    }
    return false;
}

// 加入群组:即给群组员groupuser表添加一组信息
void GroupModel::joinGroup(ConnPool* pool,int userid, int groupid, string role) {
    // 1.组装sql语句
    char sql[1024] = {0};
    sprintf(sql,"insert into groupuser values(%d,%d,'%s')",
            groupid,userid,role.c_str());
    // 2.执行sqls语句
    shared_ptr<MysqlConn> conn = pool->getConn();
    conn->update(sql);
}

// 查询用户所在群组信息:群信息以及组员信息
vector<Group> GroupModel::queryGroups(ConnPool* pool,int userid) {
    /*
    1.先根据userid在groupuser表中查询出该用户所属的群组信息
    2.在根据群组信息,查询属于该群组的所有用户的userid,并且和user表
    进行多表联合查询,查出用户的详细信息
    */
    char sql[1024] = {0};
    sprintf(sql,"select a.id,a.groupname,a.groupdesc from allgroup a inner join \
            groupuser b on a.id = b.groupid where b.userid = %d",userid);

    vector<Group> groupVec;
    shared_ptr<MysqlConn> conn = pool->getConn();
    MYSQL_RES *res = conn->query(sql);
    if(res != nullptr) {
        MYSQL_ROW row;
        // 查出userid所有的群组信息
        while((row = mysql_fetch_row(res)) != nullptr) {
            std::cout<<"group row[0]: "<<row[0]<<" row[1]: "<<row[1]<<" row[2]: "<<row[2]<<std::endl;
            Group group;
            group.setId(atoi(row[0]));
            group.setName(row[1]);
            group.setDesc(row[2]);
            groupVec.push_back(group);
        }
        mysql_free_result(res);
    }

    // 查询群组的用户信息
    for(Group& group:groupVec) {
        sprintf(sql,"select a.id,a.name,a.state,b.grouprole from user a \
                inner join groupuser b on b.userid = a.id where b.groupid=%d",group.getId());
    
        MYSQL_RES *res = conn->query(sql);
        if(res != nullptr) {
            MYSQL_ROW row;
            while((row = mysql_fetch_row(res)) != nullptr) {
                std::cout<<"group user row[0]: "<<row[0]<<" row[1]: "<<row[1]<<" row[2]: "<<row[2]<<" row[3]: "<<row[3]<<std::endl; 
                GroupUser user;
                user.setId(atoi(row[0]));
                user.setName(row[1]);
                user.setState(row[2]);
                user.setRole(row[3]);
                group.getUsers().push_back(user);
            }
            mysql_free_result(res);
        }
    }
    return groupVec;
}

//查询用户所在群组信息:群信息以及组员信息
// vector<Group> GroupModel::queryGroups(ConnPool* pool,int userid)
// {
//     /*
//     1、先根据userid在groupuser表中查询出该用户所属的群组详细信息
//     2、再根据群组信息,查询属于该群组的所有用户的userid,并且和user表进行多表联合查询出用户的详细信息
//     */

//     //1、组装SQL语句
//     char sql[1024] = {0};
//     sprintf(sql, "select a.id,a.groupname,a.groupdesc from allgroup a inner join \
//             groupuser b on a.id = b.groupid where b.userid=%d", userid);
    
//     //2、发送SQL语句,进行相应处理
//     vector<Group> groupVec;
//     // MySQL mysql;
//     shared_ptr<MysqlConn> conn = pool->getConn();
//     MYSQL_RES *res = conn->query(sql);
//     if (res != nullptr)
//     {
//         MYSQL_ROW row;
//         //查出userid所有的群信息
//         while ((row = mysql_fetch_row(res)) != nullptr)
//         {
//             Group group;
//             group.setId(atoi(row[0]));
//             group.setName(row[1]);
//             group.setDesc(row[2]);
//             groupVec.push_back(group);
//         }
//         mysql_free_result(res);
//     }

//     //查询群组的用户信息
//     for (Group &group : groupVec)
//     {
//         sprintf(sql, "select a.id,a.name,a.state,b.grouprole from user a \
//             inner join groupuser b on b.userid = a.id where b.groupid=%d", group.getId());
        
//         MYSQL_RES *res = conn->query(sql);
//         if (res != nullptr)
//         {
//             MYSQL_ROW row;
//             while ((row = mysql_fetch_row(res)) != nullptr)
//             {
//                 GroupUser user;
//                 user.setId(atoi(row[0]));
//                 user.setName(row[1]);
//                 user.setState(row[2]);
//                 user.setRole(row[3]);
//                 group.getUsers().push_back(user);
//             }
//             mysql_free_result(res);
//         }
//     }
// }

// 根据指定的groupid查询群组用户id列表,除userid自己,主要用户群聊业务给群组其他成员群发消息
vector<int> GroupModel::queryGroupUsers(ConnPool* pool,int userid, int groupid) {
    char sql[1024]={0};
    sprintf(sql,"select userid from groupuser \
    where groupid = %d and userid!=%d",groupid,userid);
    vector<int> idVec;
    shared_ptr<MysqlConn> conn = pool->getConn();
    MYSQL_RES *res = conn->query(sql);
    if(res != nullptr) {
        MYSQL_ROW row;
        while((row = mysql_fetch_row(res)) != nullptr) {
            idVec.push_back(atoi(row[0]));
        }
        mysql_free_result(res);
    }
    return idVec;
}

redis.hpp

#ifndef REDIS_H
#define REDIS_H

#include <hiredis/hiredis.h>
#include <thread>
#include <functional>
using namespace std;

class Redis {
public:
    Redis();
    ~Redis();
    // 连接redis服务器
    bool connect();
    // 向redis指定的通道channel发布消息
    bool publish(int channel,string message);
    // 向redis指定的通道subscribe订阅消息
    bool subscribe(int channel);
    // 向redis指定的通道unsubscribe取消订阅消息
    bool unsubscribe(int channel);
    // 在独立线程中接收订阅通道中的消息
    void observer_channel_message();
    // 初始化向业务层上报通道消息的回调对象
    void init_notify_handler(function<void(int,string)> fn);
private:
    // hiredis同步上下文对象,负责publish消息:相当于我们客户端一个redis-cli跟连接相关的所有信息,需要两个上下文处理
    redisContext* m_publish_context;
    // hiredis同步上下文对象,负责subscribe消息
    redisContext* m_subscribe_context;
    // 回调操作,收到订阅的消息,给service层上报:主要上报通道号、数据
    function<void(int,string)>m_notify_message_handler;
};
#endif

redis.cpp

#include <iostream>
using namespace std;
#include "redis.hpp"
//构造函数:初始化两个上下文指针
Redis::Redis() 
    : m_publish_context(nullptr)
    , m_subscribe_context(nullptr)
{
}

//析构函数:释放两个上下文指针占用资源
Redis::~Redis() {
    if (m_publish_context != nullptr) {
        redisFree(m_publish_context);
        // m_publish_context = nullptr;
    }

    if (m_subscribe_context != nullptr) {
        redisFree(m_subscribe_context);
        // m_subscribe_context = nullptr;
    }
}

//连接redis服务器
bool Redis::connect() {
    //负责publish发布消息的上下文连接
    m_publish_context = redisConnect("127.0.0.1", 6379);
    if (nullptr == m_publish_context) {
        cerr << "connect redis failed!" << endl;
        return false;
    }

    //负责subscribe订阅消息的上下文连接
    m_subscribe_context = redisConnect("127.0.0.1", 6379);
    if (nullptr == m_subscribe_context) {
        cerr << "connect redis failes!" << endl;
        return false;
    }

    //在单独的线程中监听通道上的事件,有消息给业务层上报 让线程阻塞去监听
    thread t([&](){
        observer_channel_message();
    });
    t.detach();

    cout << "connect redis-server success!" << endl;

    return true;
}

//向redis指定的通道channel publish发布消息:调用redisCommand发送命令即可
bool Redis::publish(int channel, string message) {
    redisReply *reply = (redisReply *)redisCommand(m_publish_context, "PUBLISH %d %s", channel, message.c_str()); //相当于给channel通道发送消息
    if (nullptr == reply) {
        cerr << "publish command failed!" << endl;
        return false;
    }
    freeReplyObject(reply);
    return true;
}

/* 为什么发布消息使用redisCommand函数即可,而订阅消息却不使用?
redisCommand本身会先调用redisAppendCommand将要发送的命令缓存到本地,再调用redisBufferWrite将命令发送到redis服务器上,再调用redisReply以阻塞的方式等待命令的执行。
subscribe会以阻塞的方式等待发送消息,线程是有限,每次订阅一个线程会导致线程阻塞住,这肯定是不行的。
publish一执行马上会回复,不会阻塞当前线程,因此调用redisCommand函数。
*/

//向redis指定的通道subscribe订阅消息:
bool Redis::subscribe(int channel) {
    // SUBSCRIBE命令本身会造成线程阻塞等待通道里面发生消息,这里只做订阅通道,不接收通道消息
    // 通道消息的接收专门在observer_channel_message函数中的独立线程中进行
    // 只负责发送命令,不阻塞接收redis server响应消息,否则和notifyMsg线程抢占响应资源
    if (REDIS_ERR == redisAppendCommand(this->m_subscribe_context, "SUBSCRIBE %d", channel)) { //组装命令写入本地缓存
        cerr << "subscribe command failed!" << endl;
        return false;
    }
    
    // redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)
    int done = 0;
    while (!done) {
        if (REDIS_ERR == redisBufferWrite(this->m_subscribe_context, &done)) { //将本地缓存发送到redis服务器上
            cerr << "subscribe command failed!" << endl;
            return false;
        }
    }
    // redisGetReply

    return true;
}

//向redis指定的通道unsubscribe取消订阅消息,与subscrible一样
bool Redis::unsubscribe(int channel) {
    if (REDIS_ERR == redisAppendCommand(this->m_subscribe_context, "UNSUBSCRIBE %d", channel)) {
        cerr << "unsubscribe command failed!" << endl;
        return false;
    }
    // redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)
    int done = 0;
    while (!done) {
        if (REDIS_ERR == redisBufferWrite(this->m_subscribe_context, &done)) {
            cerr << "unsubscribe command failed!" << endl;
            return false;
        }
    }
    return true;
}

//在独立线程中接收订阅通道中的消息:以循环阻塞的方式等待响应通道上发生消息
void Redis::observer_channel_message() {
    redisReply *reply = nullptr;
    while (REDIS_OK == redisGetReply(this->m_subscribe_context, (void**)&reply)) {
        //订阅收到的消息是一个带三元素的数,通道上发送消息会返回三个数据,数据下标为2
        if (reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr) {
            //给业务层上报通道上发送的消息:通道号、数据
            m_notify_message_handler(atoi(reply->element[1]->str), reply->element[2]->str);
        }
        freeReplyObject(reply);
    }
}

//初始化向业务层上报通道消息的回调对象
void Redis::init_notify_handler(function<void(int, string)> fn) {
    this->m_notify_message_handler = fn;
}

chatserver.hpp

#ifndef CHATSERVER_H
#define CHATSERVER_H

#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
using namespace muduo;
using namespace muduo::net;

// 聊天服务器的主类
class ChatServer {
public:
    // 初始化聊天服务器对象
    ChatServer(EventLoop* loop,const InetAddress& listenAddr,const string& nameArg);
    // 启动服务
    void start();  
private:
    // 上报链接相关信息的回调函数:参数为连接信息
    void onConnection(const TcpConnectionPtr& conn);
    // 上报读写事件相关信息的回调函数:参数分别为连接/缓冲区/接收到数据的时间信息
    void onMessage(const TcpConnectionPtr& conn,Buffer* buffer,Timestamp time);
    TcpServer m_server; // 组合的muduo库,实现服务器功能的类对象
    EventLoop *m_loop;  // 指向事件循环的指针
};

#endif

chatserver.cpp

#include "chatserver.hpp"
#include "chatservice.hpp"
#include "json.hpp"
#include <functional>
#include <string>
#include <iostream>
using namespace std;
using namespace placeholders;
using json = nlohmann::json;

// 初始化聊天服务器对象
ChatServer::ChatServer(EventLoop *loop, const InetAddress &listenAddr, const string &nameArg)
    : m_server(loop, listenAddr, nameArg), m_loop(loop) {
    // 注册用户连接的创建和断开事件的回调
    m_server.setConnectionCallback(std::bind(&ChatServer::onConnection, this, _1));
    // 注册用户读写事件的回调
    m_server.setMessageCallback(std::bind(&ChatServer::onMessage, this, _1, _2, _3));
    // 设置服务器线程数量 1个I/O线程,3个工作线程
    m_server.setThreadNum(4);
}

// 启动服务,开启事件循环
void ChatServer::start() {
    m_server.start();
}

// 上报链接相关信息的回调函数:参数为连接信息
void ChatServer::onConnection(const TcpConnectionPtr &conn) {
    // 客户端断开连接,释放连接资源 muduo库会打印相应日志
    if(!conn->connected()) {
        ChatService::getInstance()->clientCloseException(conn);// 处理客户端异常关闭
        conn->shutdown();// 释放socket fd资源
    }
}

// 网络模块与业务模块解耦:不直接调用相应方法,业务发生变化此处代码也不需要改动
// 上报读写事件相关信息的回调函数:参数分别为连接/缓冲区/接收到数据的时间信息
void ChatServer::onMessage(const TcpConnectionPtr &conn, Buffer *buffer, Timestamp time) {
    // 将buffer缓冲区收到的数据存入字符串
    string buf = buffer->retrieveAllAsString();
    
    std::cout<<"buf: "<<buf.c_str()<<std::endl;
    // 数据的反序列化
    json js = json::parse(buf);
    // 达到的目的:完全解耦网络模块的代码和业务模块的代码
    // 通过js["msgid"] 获取 => 业务handler => conn js time
    auto msghandler = ChatService::getInstance()->getHandler(js["msgid"].get<int>());
    // 回调消息绑定好的事件处理器,来执行相应的业务处理
    msghandler(conn,js,time);
}

chatservice.hpp

#ifndef CHATSERVICE_H
#define CHATSERVICE_H

#include <muduo/net/TcpConnection.h>
#include <unordered_map>
#include <functional>
#include <mutex>

using namespace std;
using namespace muduo;
using namespace muduo::net;

#include "json.hpp"
using json = nlohmann::json;

#include "usermodel.hpp"
#include "offlinemessagemodel.hpp"
#include "friendmodel.hpp"
#include "groupmodel.hpp"

#include "redis.hpp"
#include "ConnPool.h"

// 表示处理消息的事件回调方法类型
using MsgHandler = std::function<void(const TcpConnectionPtr& conn,json& js,Timestamp)>;

// 聊天服务器业务类,设计为单例模式:给msgid映射事件回调(一个消息id映射一个事件处理)
class ChatService {
public:
    // 获取单例对象的接口函数
    static ChatService* getInstance();
    // 处理登录业务
    void login(const TcpConnectionPtr& conn,json& js,Timestamp time); 
    // 处理注册业务(register)
    void reg(const TcpConnectionPtr& conn,json& js,Timestamp time); 
    // 处理一对一聊天业务
    void oneChat(const TcpConnectionPtr& conn,json& js,Timestamp time);
    // 添加好友业务
    // void addFriend(const TcpConnectionPtr& conn,json& js,Timestamp time);
    // 添加好友业务请求
    void addFriendRequest(const TcpConnectionPtr& conn,json& js,Timestamp time);
    // 添加好友业务响应
    void addFriendResponse(const TcpConnectionPtr& conn,json& js,Timestamp time);

    // 获取消息msgid对应的处理器
    MsgHandler getHandler(int msgid);
    // 处理客户端异常退出
    void clientCloseException(const TcpConnectionPtr& conn);
    // 服务器异常,业务重置方法
    void reset();
    // 创建群组业务
    void createGroup(const TcpConnectionPtr& conn,json& js,Timestamp time); 
    // 加入群组业务
    void joinGroup(const TcpConnectionPtr& conn,json& js,Timestamp time);   
    // 群组聊天业务
    void groupChat(const TcpConnectionPtr& conn,json& js,Timestamp time); 
    // 处理注销业务
    void loginOut(const TcpConnectionPtr &conn, json &js, Timestamp time);
    // 从redis消息队列中获取订阅的消息:通道号 + 消息
    void handleRedisSubscribeMessage(int userid, string msg);

    ChatService(const ChatService&) = delete;
    ChatService& operator=(const ChatService&) = delete;

    ConnPool* getConnPool() const { return m_connPool;}
private:
    // 注册消息以及对应的Handler回调操作
    ChatService();
    // 存储消息id和其对应的业务处理方法
    unordered_map<int,MsgHandler> m_msgHandlerMap;
    // 存储在线用户的通信连接
    unordered_map<int,TcpConnectionPtr> m_userConnMap; // 消息处理器map表 每一个msgid对应一个业务处理方法
    // 定义互斥锁,保证m_userConnMap的线程安全
    mutex m_connMutex;
    // 数据操作类对象
    UserModel m_userModel;              // 存储在线用户的通信连接map表
    OfflineMsgModel m_offlineMsgModel;  // 离线消息表的数据操作类对象
    FriendModel m_friendModel;          // 好友表的数据操作类对象
    GroupModel m_groupModel;
    Redis m_redis;                      // redis操作对象
    ConnPool* m_connPool;                   // 数据库连接池
};

#endif // CHATSERVICE_H

/*
3.1 用户注册业务:
    我们业务层与数据层分离,需要操作数据层数据对象即可,因此需要在
    ChatService类中实例化一个数据操作类对象进行业务开发
    UserModel m_userModel;// 数据操作类对象

服务器注册业务流程:
1.客户端注册的消息过来后,网络模块将json数据反序列化后上报到注册业务中,
因为User表中id字段为自增的,state字段是默认的,因此注册业务只需要获取
name与password字段即可
2.实例化User表对应的对象user,将获取到的name与password设置进去,再向
UserModel数据操作类对象进行新用户user的注册
3.注册完成后,服务器返回相应json数据给客户端:若注册成功,返回注册响应消息
REG_MSG_ACK,错误标识errno(0:成功,1:失败),用户id等组装好的json数据;
若注册失败,返回注册响应消息REG_MSG_ACK,错误标识

3.2 用户登录业务
3.2.1 基础登录业务实现
用户登录:服务器反序列化数据后,依据id,密码字段后判断账号是否正确,依据是否
登陆成功给客户端返回响应消息
服务器登录业务流程:
1.服务器获取输入用户id,密码字段
2.查询id对应的数据,判断用户id与密码是否正确,分为以下三种情况返回相应json数据给客户端:
(1)若用户名/密码正确且未重复登录,及时更新登录状态为在线,,返回登录响应消息
   LOGIN_MSG_ACK,错误标识errno(0:成功,1:失败,2:重复登录),用户id,用户名等信息
(2)若用户名/密码正确但重复登录,返回登录响应消息、错误标识、错误提示信息;
(3)若用户不存在或密码错误,返回登录响应消息,错误标识,错误提示信息;

3.2.2 记录用户连接信息处理
用户连接信息处理:假设此时用户1向用户2发送消息(源id, 目的id,消息内容),
此时服务器收到用户1的数据了,要主动向用户2推送该条消息,那么如何知道用户2
是那条连接呢。因此我们需要专门处理下,用户一旦登录成功,就会建立一条连接,
我们便要将该条连接存储下来,方便后续消息收发的处理.

3.2.3 客户端异常退出处理
客户端异常退出处理:假设用户客户端直接通过Ctrl+C中断,并没有给服务器发送合法的json过来,
我们必须及时修改用户登录状态,否则后续再想登录时为"online"状态,便无法登录了。

客户端异常退出处理流程:
1.通过conn连接去m_userConnMap表中查找,删除conn键值对记录;
2.将conn连接对应用户数据库的状态从"online"改为"offline";

3.2.4 服务器异常退出处理
服务器异常退出处理:假设用户服务器直接通过Ctrl+C中断,并没有给客户端发送
合法的json过去,我们必须及时修改所有用户登录状态未"offline",否则后续再
想登录时为"online"状态,便无法登录了。
服务器异常退出处理流程:主动截获Ctcl+c信号(SIGINT),在信号处理函数中将
数据库中用户状态重置为"offline"。

3.3 点对点聊天业务
点对点聊天:源用户向目的用户发送消息,目的用户若在线则将消息发出,
目的用户若不在线将消息存储至离线消息表中,待目的用户上线后离线
消息发出

在进行点对点聊天业务处理前,需要提前处理好以下几点:
在EnMsgType中增加一个聊天消息类型,给客户端标识此时是一个聊天消息.
将点对点业务的消息id与对应的事件处理器提前在聊天服务器业务类的构造
函数里绑定好

服务器点对点聊天业务流程
1.源id向目的id发送消息时候,消息里会包含消息类型,源id,源用户名,
目的id,消息内容,服务器解析到这些数据后,先获取到目的id字段
2.找到id判断是否在线,若在线则服务器将源id的消息中转给目的id;若
不在线则将消息内容存入离线消息表中,待目的id上线后离线消息发出

3.4 离线消息业务
离线消息业务:当用户一旦登录成功,我们查询用户是否有离线消息要发送,
若有则发送相应数据,发送完后删除本次存储的离线数据,防止数据重复发送
在进行点对点聊天业务处理前,我们需要提前处理好以下几点:
1、建立与离线消息表的映射OfflineMsgModel类:我们数据库中有创建的
OfflineMessage离线消息表,因为我们数据层与业务层要分离开来,所以
这里与前面一样提供离线消息表的数据操作类,提供给业务层对应的操作接口。

服务器离线消息业务流程:
1.无论是一对一聊天,还是群聊,若接收方用户不在线,则将发送方消息先存储至离线消息表里
2.一旦接收方用户登录成功,检查该用户是否有离线消息(可能有多条),若有则服务器
将离线消息发送给接收方用户
3.服务器发送完成后删除本次存储的离线消息,保证接收方不会每次登录都收到重复的离线消息

3.5 添加好友业务
添加好友业务:源用户id、目的用户id发送给服务器,服务器在数据库中进行好友关系的添加。
添加完成用户登录后,服务器返回好友列表信息给用户,用户可以依据好友列表进行聊天,这里实现的比较简单,后续可扩充更细化的业务。
在进行添加好友业务处理前,我们需要提前处理好以下几点:
1、我们需要在消息类型EnMsgType中增加一个聊天消息类型,给客户端标识此时是一个添加好友消息:
2、将添加好友业务的消息id与对应的事件处理器提前在聊天服务器业务类的构造函数里绑定好。
3、建立好友表与类的映射FriendModel类:表中userid与friendid关系只需要存储一次即可,因此为联合主键。这里与前面一样提供好友表的数据操作类,提供给业务层对应的操作接口。

服务器添加好友业务流程:
1.服务器获取当前用户id,要添加好友的id;
2.业务层调用数据层接口往数据库中添加相应好友信息;
用户登录成功时,查询该用户的好友信息并返回

3.6 群组业务
群组业务:群组业务分为三块,群管理员创建群组,组员加入群组与群组聊天功能
在进行群组业务处理前,我们需要提前处理好以下几点:
1.我们需要在消息类型EnMsgType中增加不同的消息类型,创建群组,
加入群组、群组聊天三种类型消息,给客户端标识此时要做什么事情:

3.6.1 创建群组
服务器创建群组业务,业务流程:
1.服务器获取创建群的用户id,要创建群名称,群功能等信息
2.业务层创建数据层对象,调用数据层方法进行群组创建,创建成功保存群组创建人信息;

3.6.2 加入群组
服务器组员加入群组业务流程:
1、服务器获取要加入群用户的id、要加入的群组id;
2、业务层调用数据层方法将普通用户加入;

3.6.3 群组聊天
服务器群组聊天业务流程:
1、获取要发送消息的用户id、要发送的群组id;
2、查询该群组其它用户id;
3、查询同组用户id,若用户在线则发送消息;若用户不在线则存储离线消息;

3.7 注销业务
注销业务: 客户端用户正常退出,更新其在线状态。

在进行注销业务处理前,我们需要提前处理好以下几点:
1、我们需要在消息类型EnMsgType中增加一个注销业务类型,给客户端标识此时是一个注销业务消息:
2、将注销业务的消息id与对应的事件处理器提前在聊天服务器业务类的构造函数里绑定好。

服务器注销业务业务流程:
1、服务器获取要注销用户的id,删除其对应的连接。
2、更新用户状态信息,从在线更新为离线。


四 服务器支持跨服务器通信功能
redis主要业务流程:
1.用户登录成功后相应的服务器需要向redis上依据用户id订阅相应通道的消息
2.当服务器上用户之间跨服务器发送消息时,需要向通道上发送消息
3、redis接收到消息通知相应服务器进行处理
*/

chatservice.cpp

#include "chatservice.hpp"
#include "public.hpp"
#include <muduo/base/Logging.h>
#include <vector>
#include <map>
#include <string>
#include <string.h>
#include <iostream>
using namespace std;
using namespace muduo;

// 获取单例对象的接口函数 线程安全的单例对象
ChatService* ChatService::getInstance() {
    static ChatService service;
    return &service;
}

// 构造函数:注册消息以及对应的Handler回调操作 实现网络模块与业务模块解耦的核心
// 将群组业务的消息id分别与对应的事件处理器提前在聊天服务器业务类的构造函数里绑定好
ChatService::ChatService() {
    m_msgHandlerMap.insert({LOGIN_MSG,std::bind(&ChatService::login, this, _1, _2, _3)});  
    m_msgHandlerMap.insert({REG_MSG,std::bind(&ChatService::reg, this, _1, _2, _3)});  
    m_msgHandlerMap.insert({ONE_CHAT_MSG,std::bind(&ChatService::oneChat, this, _1, _2, _3)});
    // m_msgHandlerMap.insert({ADD_FRIEND_MSG,std::bind(&ChatService::addFriend, this, _1, _2, _3)}); 
    m_msgHandlerMap.insert({ADD_FRIEND_REQ_MSG,std::bind(&ChatService::addFriendRequest, this, _1, _2, _3)});  
    m_msgHandlerMap.insert({ADD_FRIEND_MSG_ACK,std::bind(&ChatService::addFriendResponse, this, _1, _2, _3)});

    m_msgHandlerMap.insert({LOGIN_OUT_MSG, std::bind(&ChatService::loginOut, this, _1, _2, _3)});
    m_msgHandlerMap.insert({CREATE_GROUP_MSG, std::bind(&ChatService::createGroup, this, _1, _2, _3)});
    m_msgHandlerMap.insert({ADD_GROUP_MSG, std::bind(&ChatService::joinGroup, this, _1, _2, _3)});
    m_msgHandlerMap.insert({GROUP_CHAT_MSG, std::bind(&ChatService::groupChat, this, _1, _2, _3)});
    // 连接redis服务器
    if(m_redis.connect()) {
        // 设置上报消息的回调 
        m_redis.init_notify_handler(std::bind(&ChatService::handleRedisSubscribeMessage, this, _1, _2));  
    }
    // 初始化数据库
    m_connPool = ConnPool::getConnPool();
}

// 处理登录业务  user表:id password字段
void ChatService::login(const TcpConnectionPtr &conn, json &js, Timestamp time) {
    // 1.获取ids,password字段
    int id = js["id"].get<int>();
    string pwd = js["password"];

    // 传入用户id,返回相应数据
    ConnPool* connPool = this->getConnPool();
    User user = m_userModel.query(connPool,id);
    if(user.getId() == id && user.getPwd() == pwd) { // 登录成功
        if(user.getState() == "online") {
            //该用户已经登录,不允许重复登录
            json response;
            response["msgid"] = LOGIN_MSG_ACK;
            response["errno"] = 2; // 重复登录
            // response["errmsg"] = "该账号已经登录,请重新输入新账号";
            response["errmsg"] = "this account has logined, please input a new account";    
            conn->send(response.dump());
        }
        else{ // 用户未登录,此时登录成功
            // 登录成功,记录用户连接信息
            /*
            在用户登录成功时便将用户id与连接信息记录在一个map映射表里,方便后续查找与使用
            线程安全问题:上述我们虽然建立了用户id与连接的映射,但是在多线程环境下,不同的用户
            可能会在不同的工作线程中调用同一个业务,可能同时有多个用户上线,下线操作,因此要
            保证map表的线程安全
            */
            {
                lock_guard<mutex> lock(m_connMutex);
                m_userConnMap.insert({id, conn}); // 登录成功记录用户连接信息
            }
            // id用户登录成功后,向redis订阅channel(id)通道的事件
            m_redis.subscribe(id);

            // 登录成功,更新用户状态信息 state: offline => online
            user.setState("online");
            m_userModel.updateState(connPool,user); // 更新用户状态信息

            json response;
            response["msgid"] = LOGIN_MSG_ACK;
            response["errno"] = 0;
            response["id"] = user.getId();
            response["name"] = user.getName();
            
            // 查询该用户是否有离线消息
            vector<string> vec = m_offlineMsgModel.query(connPool,id);
            if(!vec.empty()) {
                response["offlinemsg"] = vec;// 查询到离线消息,发送给用户
                cout<<"查询到离线消息,发送给用户 :" <<response["offlinemsg"]<<endl;
                // 读取该用户的离线消息后,把该用户的所有离线消息删除掉
                m_offlineMsgModel.remove(connPool,id);
            }
            // 登录成功,查询该用户的好友信息并返回
            vector<User>userVec = m_friendModel.query(connPool,id);
            if(!userVec.empty()) {
                vector<string> vec2;
                for(User &user : userVec) {
                    json js;
                    js["id"] = user.getId();
                    js["name"] = user.getName();
                    js["state"] = user.getState();
                    vec2.push_back(js.dump());
                }
                response["friends"] = vec2;
            }

            vector<Group> groupVec = m_groupModel.queryGroups(connPool,id);
            if(groupVec.size() > 0) {
                // cout<<"................sdsdfasas................."<<endl;
                vector<string> vec3;
                for(Group& group:groupVec) {
                    vector<GroupUser> users = group.getUsers();
                    json js;
                    js["id"] = group.getId();
                    js["groupname"] = group.getName();
                    js["groupdesc"] = group.getDesc();

                    vector<string> userVec;
                    for(GroupUser& user:users) {
                        json js_tmp;
                        js_tmp["id"] = user.getId();
                        js_tmp["name"] = user.getName();
                        js_tmp["state"] = user.getState();
                        js_tmp["role"] = user.getRole();
                        userVec.push_back(js_tmp.dump());
                    }
                    js["users"] = userVec;
                    vec3.push_back(js.dump());
                    // cout<<"js.dump() = "<<js.dump()<<endl;
                }
                response["groups"] = vec3;
            }
            conn->send(response.dump());
        }
    }
    else {
        // 该用户不存在/用户存在但是密码错误,登录失败
        json response;
        response["msgid"] = LOGIN_MSG_ACK;
        response["errno"] = 1;
        // response["errmsg"] = "该用户不存在,您输入用户名或者密码可能错误!";
        response["errmsg"] = "This user does not exist, or the password you entered may be incorrect!"; 
        conn->send(response.dump());
    }
}

// 处理注册业务 user表:name password
void ChatService::reg(const TcpConnectionPtr &conn, json &js, Timestamp time) {
    // 1.获取name,password字段
    string name = js["name"];
    string pwd = js["password"];

    // 处理业务,操作的都是数据对象
    // 2.创建User对象,进行注册
    User user;
    user.setName(name);
    user.setPwd(pwd);
    // 新用户的插入
    ConnPool* connPool = this->getConnPool();
    bool state = m_userModel.insert(connPool,user);
    if(state) { // 注册成功
        json response;
        response["msgid"] = REG_MSG_ACK; // 注册响应消息
        response["errno"] = 0;           // 错误标识 0:成功 1:失败
        response["id"] = user.getId();
        conn->send(response.dump());
    }
    else { // 注册失败
        json response;
        response["msgid"] = REG_MSG_ACK;
        response["errno"] = 1;
        conn->send(response.dump());
    }
}

// 处理一对一聊天业务
void ChatService::oneChat(const TcpConnectionPtr &conn, json &js, Timestamp time) {
    // 1.先获取目的id
    int toid = js["toid"].get<int>();
    {
        lock_guard<mutex> lock(m_connMutex);
        auto it = m_userConnMap.find(toid);
        // 2.目的id在线 进行消息转发,服务器将源id发送的消息中转给目的id
        if(it != m_userConnMap.end()) {
            // toid在线,转发消息  服务器主动推送消息给toid用户
            it->second->send(js.dump());
            return;
        }
    }
    
    // 查询toid是否在线
    /*
     * A向B说话,在map表中未找到B,B可能不在本台服务器上但通过
     * 数据库查找在线,要发送的消息直接发送以B用户为id的通道上;
     * 也可能是离线状态,发送离线消息
     */

    cout<<"发送消息 :" <<js.dump()<<endl;

    ConnPool* connPool = this->getConnPool();
    User user = m_userModel.query(connPool,toid);
    if(user.getState() == "online") {
        m_redis.publish(toid, js.dump());
        return;
    }

    // 目的id不在线,将消息存储到离线消息里
    m_offlineMsgModel.insert(connPool,toid, js.dump());
}

// 添加好友业务 msgid id friendid
// void ChatService::addFriend(const TcpConnectionPtr &conn, json &js, Timestamp time) {
//     std::cout<<"添加好友业务 msgid id friendid"<<std::endl;
//     // 1.获取当前用户id,要添加好友id
//     int userid = js["id"].get<int>();
//     int friendid = js["friendid"].get<int>();
//     std::cout<<"打印当前用户id:"<<userid<<std::endl;
//     std::cout<<"打印要添加好友id:"<<friendid<<std::endl;
//     // 2.数据库中存储要添加好友的信息
//     ConnPool* connPool = this->getConnPool();
//     m_friendModel.insert(connPool,userid, friendid);
// }
// 添加好友业务请求
void ChatService::addFriendRequest(const TcpConnectionPtr &conn, json &js, Timestamp time) {
    int userid = js["id"].get<int>();
    int friendid = js["friendid"].get<int>();
    json response;
    response["msgid"] = ADD_FRIEND_REQ_MSG;

    string msgStr = "用户ID: "+to_string(userid)+" ,请求添加您为好友"+to_string(friendid);
    response["msg"] = msgStr;
    response["from"] = userid;
    response["toid"] = friendid;
    std::cout<<"来到这里了:"<<response.dump()<<std::endl;
    oneChat(conn,response,time);
}
 
// 添加好友业务 msgid id friendid
void ChatService::addFriendResponse(const TcpConnectionPtr &conn, json &js, Timestamp time) {
    int userid = js["id"].get<int>();
    int friendid = js["friendid"].get<int>();
    bool flag = js["flag"].get<bool>();
    json response;
    response["msgid"] = ADD_FRIEND_MSG_ACK;
    response["from"] = userid;
    response["toid"] = friendid;
    if(flag) {
        response["msg"] = "I very happy to make friends with you!!!";
        ConnPool* connPool = this->getConnPool();
        m_friendModel.insert(connPool,userid, friendid);
    }
    else{
        response["msg"] = "I am very sorry, you are not my friend!!!";
    }
    cout<<"response.dump() : "<<response.dump()<<endl;
    oneChat(conn,response,time);
}

// 获取消息msgid对应的处理器
MsgHandler ChatService::getHandler(int msgid) {
    // 记录错误日志,msgid没有对应的事件处理回调
    auto it = m_msgHandlerMap.find(msgid);
    if(it == m_msgHandlerMap.end()) {
        // 返回一个默认的处理器,空操作
        return [=](const TcpConnectionPtr &conn, json &js, Timestamp) {
            LOG_ERROR << "msgid:" << msgid << " can not find handler!";
        };//msgid没有对应处理器,打印日志,返回一个默认处理器,空操作
    }
    else {
        return m_msgHandlerMap[msgid];
    }
}

// 处理客户端异常退出
void ChatService::clientCloseException(const TcpConnectionPtr &conn) {
    User user;
    {
        lock_guard<mutex> lock(m_connMutex);   
        // 1.从map表删除用户的连接信息
        for(auto it = m_userConnMap.begin();it!=m_userConnMap.end();++it) {
            if(it->second == conn) {
                // 从map表删除用户的链接信息
                user.setId(it->first);
                m_userConnMap.erase(it);
                break;
            }
        }
    }

    // 用户注销,相当于就是下线,在redis中取消订阅通道
    m_redis.unsubscribe(user.getId());

    // 2.更新用户的状态信息
    if(user.getId() != -1) {
        user.setState("offline");
        ConnPool* connPool = this->getConnPool();
        m_userModel.updateState(connPool,user);
    }
   
}

// 服务器异常,业务重置方法
void ChatService::reset() {
    // 把online状态的用户,设置成offline
    ConnPool* connPool = this->getConnPool();
    m_userModel.resetState(connPool);
}

// 创建群组业务
void ChatService::createGroup(const TcpConnectionPtr &conn, json &js, Timestamp time) {
    // 1.获取创建群的用户id,群名称,群功能
    int userid = js["id"].get<int>();
    string name = js["groupname"];
    string desc = js["groupdesc"];
    // 2.存储新创建的群组信息
    ConnPool* connPool = this->getConnPool();