c++消息队列的实现

时间:2022-12-24 17:40:33

(1)消息队列的实现

#ifndef NET_FRAME_CONCURRENT_QUEUE_H
#define NET_FRAME_CONCURRENT_QUEUE_H

#include <queue>
#include <mutex>
#include <condition_variable>



template<class Type>
/*消息队列实现*/
class ConcurrentQueue {
ConcurrentQueue& operator=(const ConcurrentQueue&) = delete;

ConcurrentQueue(const ConcurrentQueue& other) = delete;

public:
ConcurrentQueue() : _queue(), _mutex(), _condition() { }

virtual ~ConcurrentQueue() { }

void Push(Type record) {
std::lock_guard <std::mutex> lock(_mutex);
_queue.push(record);
_condition.notify_one();
}

bool Pop(Type& record, bool isBlocked = true) {
if (isBlocked) {
std::unique_lock <std::mutex> lock(_mutex);
while (_queue.empty()) {
_condition.wait(lock);
}
}
else // If user wants to retrieve data in non-blocking mode
{
std::lock_guard <std::mutex> lock(_mutex);
if (_queue.empty()) {
return false;
}
}

record = std::move(_queue.front());
_queue.pop();
return true;
}

int32_t Size() {
std::lock_guard <std::mutex> lock(_mutex);
return _queue.size();
}

bool Empty() {
std::lock_guard <std::mutex> lock(_mutex);
return _queue.empty();
}

private:
std::queue <Type> _queue;
mutable std::mutex _mutex;
std::condition_variable _condition;
};


#endif //NET_FRAME_CONCURRENT_QUEUE_H

(2)拥有消息队列的线程池的实现

.h文件如下

#ifndef NET_FRAME_THREAD_POOL_H
#define NET_FRAME_THREAD_POOL_H

#include "ConcurrentQueue.h"

#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

#define MIN_THREADS 10


template<class Type>
class ThreadPool {
ThreadPool& operator=(const ThreadPool&) = delete;

ThreadPool(const ThreadPool& other) = delete;

public:
ThreadPool(int32_t threads, std::function<void(Type& record)> handler);

virtual ~ThreadPool();

void Submit(Type record);

private:

private:
bool _shutdown;
int32_t _threads;
std::function<void(Type& record)> _handler;
std::vector <std::thread> _workers;
ConcurrentQueue <Type> _tasks;
};




    template<class Type>
    ThreadPool<Type>::ThreadPool(int32_t threads, std::function<void(Type &record)> handler)
            : _shutdown(false),
              _threads(threads),
              _handler(handler),
              _workers(),
              _tasks() {
        if (_threads < MIN_THREADS)
            _threads = MIN_THREADS;


        for (int32_t i = 0; i < _threads; ++i)
            _workers.emplace_back(
                    [this] {
                        while (!_shutdown) {
                            Type record;
                            _tasks.Pop(record, true);
                            _handler(record);
                        }
                    }
            );
    }


    template<class Type>
    ThreadPool<Type>::~ThreadPool() {
        for (std::thread &worker: _workers)
            worker.join();
    }


    template<class Type>
    void ThreadPool<Type>::Submit(Type record) {
        _tasks.Push(record);
    }




#endif //NET_FRAME_THREAD_POOL_H