通过c++11的condition_variable实现的有最大缓存限制的队列

时间:2023-03-10 05:02:47
通过c++11的condition_variable实现的有最大缓存限制的队列

之前曾写过一个通过C++11的condition_variable实现的有最大缓存限制的队列,底层使用std::queue来实现,如果想要提升性能的话,可以考虑改用固定的长度环形数组。环形数组实现如下:

#include <cassert>
#include <type_traits>
#include <stdexcept> /*
* 文件名: circle_buffer
* 实现说明:底层使用数组来实现循环buffer
* (1) 当m_begIdx和m_endIdx相同时,表示数组为空,否则标识数组存在值
* (2) 通过预先多分配一个节点的方式,来实现存储count个元素的目的
*/ class empty_error : public std::logic_error {
explicit empty_error(const std::string& what_arg)
: logic_error(what_arg)
{} explicit empty_error(const char* what_arg)
: logic_error(what_arg)
{} }; class full_error : public std::logic_error {
explicit full_error(const std::string& what_arg)
: logic_error(what_arg)
{} explicit full_error(const char* what_arg)
: logic_error(what_arg)
{} }; template <typename T>
class circle_buffer {
public:
using size_type = size_t;
public:
explicit circle_buffer(size_type count)
: m_bufSize(count+),
m_buf(static_cast<T*>(std::malloc(sizeof(T)*m_bufSize))),
m_begIdx(),
m_endIdx()
{
assert(count >= );
if (m_buf == nullptr) {
throw std::bad_alloc();
}
} ~circle_buffer() {
clear(typename std::is_trivially_destructible<T>::type());
} size_t size() const noexcept {
if (m_endIdx < m_begIdx) {
return m_endIdx + m_bufSize - m_begIdx;
}
return m_endIdx - m_begIdx;
} bool empty() const noexcept {
return m_begIdx == m_endIdx;
} bool full() const noexcept {
return ((m_endIdx+) == m_begIdx) ||
(m_begIdx == && m_endIdx == getMaxIdx());
} // buffer最后插入一个值,这里会检查是否存在空间,如果不存在,则抛出异常
template <typename... Args>
void pushCheck(Args&&... args) {
if (full()) {
throw full_error("pushCheck invoked when buffer is full");
} push(std::forward<Args>(args)...);
} // buffer最后插入一个值,这里不做检查是否存在空间
template <typename... Args>
void push(Args&&... args) {
new (&m_buf[m_endIdx]) T(std::forward<Args>(args)...);
advanceIdx(m_endIdx);
} // buffer最前面取出一个值,这里会检查是否存在元素可以取出,如果不存在,则抛出异常
T popCheck() {
if (empty()) {
throw empty_error("popCheck invoked when buffer is empty");
} return pop();
} // buffer最前面取出一个值
T pop() {
auto val = std::move(m_buf[m_begIdx]);
clearOne(typename std::is_trivially_destructible<T>::type());
advanceIdx(m_begIdx);
return val;
} private:
// 将指示位置的序号前进一格
void advanceIdx(size_t& idx) noexcept {
if (idx == getMaxIdx()) {
idx = ;
} else {
++idx;
}
} // 非trivially析构函数类型
void clear(std::false_type) {
while (m_begIdx != m_endIdx) {
m_buf[m_begIdx].~T();
advanceIdx(m_begIdx);
}
std::free(m_buf);
} // trivially析构函数类型
void clear(std::true_type) {
std::free(m_buf);
} // 非trivially析构函数类型
void clearOne(std::false_type) {
m_buf[m_begIdx].~T();
} // trivially 析构函数类型
void clearOne(std::true_type) {
} size_t getMaxIdx() const noexcept {
return m_bufSize-;
} private:
size_type m_bufSize;
T* m_buf;
size_type m_begIdx;
size_type m_endIdx;
};

关于上面的环形数组,简单的单元测试代码如下,这里使用了catch2,如下代码需要放在.cpp文件中。

#define CATCH_CONFIG_MAIN
// This tells Catch to provide a main() - only do this in one cpp file
#include "catch.hpp"
#include "circle_buffer.h" TEST_CASE("circle buffer manipulation", "[circle_buffer]") {
circle_buffer<int> cb(); REQUIRE( cb.size() == );
REQUIRE( cb.empty() == true);
REQUIRE( cb.full() == false); cb.push();
cb.push(); REQUIRE( cb.size() == );
REQUIRE( cb.empty() == false );
REQUIRE( cb.full() == true ); auto dropFirst = cb.pop(); REQUIRE( dropFirst == );
REQUIRE( cb.size() == );
REQUIRE( cb.empty() == false );
REQUIRE( cb.full() == false ); cb.push(); REQUIRE( cb.size() == );
REQUIRE( cb.empty() == false );
REQUIRE( cb.full() == true); auto dropSecond = cb.pop(); REQUIRE( dropSecond == );
REQUIRE( cb.size() == );
REQUIRE( cb.empty() == false );
REQUIRE( cb.full() == false ); auto dropThird = cb.pop(); REQUIRE( dropThird == );
REQUIRE( cb.size() == );
REQUIRE( cb.empty() == true );
REQUIRE( cb.full() == false );
}

下面是基于环形数组实现的有最大长度限制的生产者消费者队列,注意一点,在使用下面队列时,编译选项要加上-std=c++11。

#include <condition_variable>
#include <chrono>
#include "circle_buffer.h" template <typename T>
class producer_consumer_queue {
public:
producer_consumer_queue(int maxSize): m_buffer(maxSize) { } // 处理数据线程
T readQueue() {
T data;
// 取出数据,然后处理数据
{
std::unique_lock<std::mutex> lock(m_queueMtx);
m_consumeCv.wait(lock, [this] { return !m_buffer.empty(); }); data = m_buffer.pop();
}
m_produceCv.notify_one(); return data;
} // 生产数据线程,返回值表示是否生产成功,如果超时就不会生产成功
template <typename Rep, typename Period, typename ...Args>
bool writeQueue(const std::chrono::duration<Rep, Period>& wait_time, Args&& ...args) {
// 预设一个消费者处理这个数据
{
std::unique_lock<std::mutex> lock(m_queueMtx);
auto success = m_produceCv.wait_for(lock, wait_time, [this] { return !m_buffer.full(); });
if (!success) {
return false;
}
m_buffer.push(std::forward<Args>(args)...);
}
m_consumeCv.notify_one();
return true;
} private:
// 用来缓存数据
circle_buffer<T> m_buffer;
// 用来保护数据
std::mutex m_queueMtx;
// 用来提醒当前可以消费
std::condition_variable m_consumeCv;
// 用来提醒当前可以生产
std::condition_variable m_produceCv;
};

以上就是这个队列的具体实现。之后,考虑写一些关于中间件的知识,可能会从grpc开始吧。

相关文章