《c++并发编程实战》之第3章 线程共享

时间:2022-12-05 07:30:44

3.1 共享问题

只读数据无问题。数据改动可能引发问题。

3.1.1 条件竞争

线程执行各自操作的结果取决于执行的相对次序。

3.1.2 防止恶性条件竞争


  • 采取保护措施包装数据结构,确保不变量被破坏时,中间状态只对执行改动的线程可见。

  • 无锁
    修改数据结构的设计及其不变量,由一连串不开拆分的改动完成数据变更,每个改动都维持不变量不被破坏。

  • 修改数据结构当作事务(transaction,类似数据库,本书不考虑)处理。

3.2 互斥保护共享数据

访问数据结构钱,锁住互斥;访问结束后,解锁互斥。

3.2.1 使用互斥

3.1.cpp

#include <list>
#include <mutex>
#include <algorithm>
#include <iostream>

std::list<int> some_list;
std::mutex some_mutex;

void add_to_list(int new_value)
{
    std::lock_guard<std::mutex> guard(some_mutex);
    some_list.push_back(new_value);
}

bool list_contains(int value_to_find)
{
    std::lock_guard<std::mutex> guard(some_mutex);
    return std::find(some_list.begin(), some_list.end(), value_to_find) != some_list.end();
}

int main()
{
    add_to_list(42);
    std::cout << "contains(1)=" << list_contains(1) << ", contains(42)=" << list_contains(42) << std::endl;
    return 0;
}

3.2.2 组织和编排代码保护共享数据

不得向锁所在的作用域之外传递指针和引用,指向受保护的共享数据,无论是通过函数返回值将它们保存到对外可见的内存,还是将它们作为参数传递给使用者提供的函数。

3.2.cpp

#include <string>
#include <mutex>

class some_data
{
    int a;
    std::string b;

public:
    void do_something() {}
};

class data_wrapper
{
private:
    some_data data;
    std::mutex m;

public:
    template <typename Function>
    void process_data(Function func)
    {
        std::lock_guard<std::mutex> l(m);
        func(data);//向malicious_function函数传递了some_data引用
    }
};

some_data *unprotected;

void malicious_function(some_data &protected_data)
{
    unprotected = &protected_data;
}

data_wrapper x;

void foo()
{
    x.process_data(malicious_function);//unprotected指向了x中data
    unprotected->do_something();//不可控
}

int main()
{
    foo();
    return 0;
}

3.2.3 接口固有的条件竞争

3.3.cpp

#include <deque>
#include <cstddef>

template <typename T, typename Container = std::deque<T>>
class stack
{
public:
    explicit stack(const Container &);
    explicit stack(Container && = Container());
    template <class Alloc> explicit stack(const Alloc &);
    template <class Alloc> stack(const Container &, const Alloc &);
    template <class Alloc> stack(Container &&, const Alloc &);
    template <class Alloc> stack(stack &&, const Alloc &);

    bool empty() const;
    size_t size() const;
    T &top();
    T const &top() const;
    void push(T const &);
    void push(T &&);
    void pop();
    void swap(stack &&);
    template<class... Args> void emplace(Args&&... args);
};

int main()
{
    return 0;
}
stact<int> s;
if(!s.empty()) {	//1
	int const value = s.top();	//2
	s.pop();	//3
	do_something(value);
}

多线程情况下,1,2,3之间交替运行会出错。

消除条件竞争的方法:

  1. 传入引用
std::vector<int> result;
some_stack.pop(result);

短处:构造result需要代价,栈容器存储类别要可赋值。

  1. 提供不抛出异常的拷贝构造函数或移动构造函数

  2. 返回指针,指向弹出的元素

指针可*的复制,建议使用std::shared_ptr。

  1. 结合方法1和方法2,或结合方法1和方法3
template <typename T>
class stack {
public:
	bool pop(T &t) {
        lock_guard<mutex> l(m);
        if(data.empty())
        	return false;
		t = std::move(data.top()); //T需要移动构造函数
		data.pop();
		return true;
	}
private:
	deque<T> data;
	mutex m;
}
  1. 线程安全的栈容器类

简要定义

3.4.cpp

#include <exception>
#include <memory>

struct empty_stack : std::exception
{
    const char *what() const throw();
};

template <typename T>
class threadsafe_stack
{
public:
    threadsafe_stack();
    threadsafe_stack(const threadsafe_stack &);
    threadsafe_stack &operator=(const threadsafe_stack &) = delete;

    void push(T new_value);
    std::shared_ptr<T> pop();
    void pop(T &value);
    bool empty() const;
};

int main()
{
    return 0;
}

详尽定义

3.5.cpp

#include <exception>
#include <stack>
#include <mutex>
#include <memory>
#include <iostream>
using namespace std;

struct empty_stack : std::exception
{
    const char *what() const throw()
    {
        return "empty stack";
    }
};

template <typename T>
class threadsafe_stack
{
private:
    std::stack<T> data;
    mutable std::mutex m;

public:
    threadsafe_stack() {}
    threadsafe_stack(const threadsafe_stack &other)
    {
        std::lock_guard<std::mutex> lock(other.m);
        data = other.data;
    }
    threadsafe_stack &operator=(const threadsafe_stack &) = delete;

    void push(T new_value)
    {
        std::lock_guard<std::mutex> lock(m);
        data.push(new_value);
    }
    std::shared_ptr<T> pop()
    {
        std::lock_guard<std::mutex> lock(m);
        if (data.empty())
            throw empty_stack();
        std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
        data.pop();
        return res;
    }
    void pop(T &value)
    {
        std::lock_guard<std::mutex> lock(m);
        if (data.empty())
            throw empty_stack();
        value = data.top();
        data.pop();
    }
    bool empty() const
    {
        std::lock_guard<std::mutex> lock(m);
        return data.empty();
    }
};

int main()
{
    threadsafe_stack<int> si;
    si.push(5);
    si.push(6);
    auto data = si.pop();
    cout << *data << endl;
    if (!si.empty())
    {
        int x;
        si.pop(x);
        cout << x << endl;
    }

    //error
    //si.pop();

    return 0;
}

3.2.4 死锁问题和解决方法

为了某项操作而对多个互斥加锁,都锁住一个互斥,等着给另一个互斥加锁,双方苦等对方解锁互斥,形成死锁。
常见解决方法,始终按相同顺序对两个互斥加锁;同时锁住多个互斥。

3.6.cpp

#include <mutex>

class some_big_object
{
};

void swap(some_big_object &lhs, some_big_object &rhs)
{
}

class X
{
private:
    some_big_object some_detail;
    mutable std::mutex m;

public:
    X(some_big_object const &sd) : some_detail(sd) {}

    friend void swap(X &lhs, X &rhs)
    {
        if (&lhs == &rhs)
            return;

        // c++11
        std::lock(lhs.m, rhs.m);
        std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);
        std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);

        // c++17
        // std::scoped_lock guard(lhs.m, rhs.m);
        // std::scoped_lock<std::mutex, std::mutex> guard(lhs.m, rhs.m);

        swap(lhs.some_detail, rhs.some_detail);
    }
};

int main()
{
    return 0;
}

3.2.5 防范死锁的补充原则

只要另一线程有可能正在等待当前线程,那么当前线程千万不能反过来等待它。

  1. 避免嵌套锁

已持有锁,则不用试图获取第二个锁(每个线程最多一个锁)。需获取多个锁,调用std::lock()函数,一次同时获取全部锁。

  1. 持锁后,避免调用用户接口

用户程序接口可能随意操作,包括试图获取锁。

  1. 固定顺序获取锁

  2. 按层级加锁

3.7.cpp

#include <mutex>

class hierarchical_mutex
{
public:
    explicit hierarchical_mutex(unsigned level)
    {
    }

    void lock()
    {
    }
    void unlock()
    {
    }
};

hierarchical_mutex high_level_mutex(10000);
hierarchical_mutex low_level_mutex(5000);

int do_low_level_stuff()
{
    return 42;
}

int low_level_func()
{
    std::lock_guard<hierarchical_mutex> lk(low_level_mutex);
    return do_low_level_stuff();
}

void high_level_stuff(int some_param)
{
}

void high_level_func()
{
    std::lock_guard<hierarchical_mutex> lk(high_level_mutex);
    high_level_stuff(low_level_func());
}

//锁住高层high_level_mutex,后锁住低层low_level_mutex
//先调用低层函数low_level_func,后调用高层函数high_level_stuff(int)
void thread_a()
{
    high_level_func();
}

hierarchical_mutex other_mutex(100);
void do_other_stuff()
{
}

void other_stuff()
{
    high_level_func();
    do_other_stuff();
}

//无视规则,先锁住最低层other_mutex
void thread_b()
{
    std::lock_guard<hierarchical_mutex> lk(other_mutex);
    other_stuff();
}

int main()
{
    return 0;
}

3.8.cpp

#include <mutex>
#include <stdexcept>
#include <climits>

class hierarchical_mutex
{
    std::mutex internal_mutex;
    unsigned long const hierarchy_value;

    //临时保存线程值this_thread_hierarchy_value
    unsigned long previous_hierarchy_value;

    //thread_local线程值
    static thread_local unsigned long this_thread_hierarchy_value;

    //未按照从大到小的层级加锁时,会报错
    void check_for_hierarchy_violation()
    {
        if (this_thread_hierarchy_value <= hierarchy_value)
        {
            throw std::logic_error("mutex hierarchy violated");
        }
    }
    void update_hierarchy_value()
    {
        previous_hierarchy_value = this_thread_hierarchy_value;
        this_thread_hierarchy_value = hierarchy_value;
    }

public:
    explicit hierarchical_mutex(unsigned long value) : hierarchy_value(value),
                                                       previous_hierarchy_value(0)
    {
    }
    void lock()
    {
        check_for_hierarchy_violation();
        internal_mutex.lock();
        update_hierarchy_value();
    }
    void unlock()
    {
        this_thread_hierarchy_value = previous_hierarchy_value;
        internal_mutex.unlock();
    }
    bool try_lock()
    {
        check_for_hierarchy_violation();
        if (!internal_mutex.try_lock())
            return false;
        update_hierarchy_value();
        return true;
    }
};

thread_local unsigned long
    hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);

int main()
{
    hierarchical_mutex m1(42);
    hierarchical_mutex m2(2000);

    return 0;
}
  1. 将准则推广到操作以外

死锁现象并不单单因加锁操作而发生,任何同步机制导致的循环等待都会导致死锁出现。

3.2.6 std::unique_lock<>灵活加锁

可完全替代std::lock_guard(优先使用lock_guard),有性能损失。

mutex mtx;
mtx.lock();

//std::adopt_lock表明mtx已调用lock(),lock构造函数不再调用lock()
std::unique_lock<std::mutex> lock(mtx, std::adopt_lock);
mutex mtx;

//std::defer_lock表明lock构造函数不调用lock(),后面会调用
std::unique_lock<std::mutex> lock(mtx, std::defer_lock);

3.9.cpp

#include <mutex>

class some_big_object
{
};

void swap(some_big_object &lhs, some_big_object &rhs)
{
}

class X
{
private:
    some_big_object some_detail;
    mutable std::mutex m;

public:
    X(some_big_object const &sd) : some_detail(sd) {}

    friend void swap(X &lhs, X &rhs)
    {
        if (&lhs == &rhs)
            return;
        std::unique_lock<std::mutex> lock_a(lhs.m, std::defer_lock);
        std::unique_lock<std::mutex> lock_b(rhs.m, std::defer_lock);
        std::lock(lock_a, lock_b);
        swap(lhs.some_detail, rhs.some_detail);
    }
};

int main()
{
    return 0;
}

3.2.7 不同作用域间转移互斥归属权

互斥归属权可在多个std::unique_lock实例间转移。

std::unique_lock<std::mutex> get_lock(){
	extern std::mutex some_mutex:
	std::unique_lock<std::mutex> lk(some_mutex);
	prepare_data();
	return lk;
}

void process_data(){
	std::unique_lock<std::mutex> lk(get_lock());
	do_something();
}

3.2.8 按合适粒度加锁

仅仅在访问共享数据期间才锁住互斥,让数据处理尽可能不用锁保护。
持锁期间,避免耗时操作,如读写文件。

void get_and_process_data(){
	std::unique_lock<std::mutex> my_lock(the_mutex);
	some_class data_to_process = get_next_data_chunk();
	my_lock.unlock();

	result_type result = process(data_to_process);
	my_lock.lock();
	write_result(data_to_process, result);
}

3.10.cpp

#include <mutex>

class Y
{
private:
    int some_detail;
    mutable std::mutex m;

    int get_detail() const
    {
        std::lock_guard<std::mutex> lock_a(m);
        return some_detail;
    }

public:
    Y(int sd) : some_detail(sd) {}

    friend bool operator==(Y const &lhs, Y const &rhs)
    {
        if (&lhs == &rhs)
            return true;
        int const lhs_value = lhs.get_detail();
        int const rhs_value = rhs.get_detail();
        return lhs_value == rhs_value;
    }
};

int main()
{
    return 0;
}

3.3 其它工具保护共享数据

共享数据仅初始化(创建)过程中受到保护,之后无需保护(只读)。

3.3.1 初始化过程中保护共享数据

创建共享数据开销不菲(建立数据库连接、分配大量内存等),等到必要时(使用)才创建,延迟初始化(lazy initialization),常见于单线程。

std::shared_ptr<some_resource> resource_ptr;
void foo() {
	if(!resource_ptr){
		resource_ptr.reset(new some_resource());
	}
	resource_ptr->do_something();
}

互斥实现线程安全的延迟初始化

3.11.cpp

#include <memory>
#include <mutex>

struct some_resource
{
    void do_something()
    {
    }
};

std::shared_ptr<some_resource> resource_ptr = nullptr;
std::mutex resource_mutex;

void foo()
{
    resource_mutex.lock();
    if (!resource_ptr)
        resource_ptr.reset(new some_resource);
    resource_mutex.unlock();

    resource_ptr->do_something();
}

//双重检验锁定模式(double-checked locking pattern)
void undefined_behaviour_with_double_checked_locking()
{
    if (!resource_ptr)
    {
        std::lock_guard<std::mutex> lk(resource_mutex);
        if (!resource_ptr)
            resource_ptr.reset(new some_resource);
    }
    resource_ptr->do_something();
}

int main()
{
    foo();
    undefined_behaviour_with_double_checked_locking();
    return 0;
}
std::shared_ptr<some_resource> resource_ptr;
std::once_flag resource_flag;
void init_resource(){
	resource_ptr.reset(new some_resource);
}
void foo(){
	std::call_once(resource_flag, init_resource);
	resource_ptr->do_something();
}

once_flag和call_once实现延迟初始化

3.12.cpp

#include <mutex>

struct connection_info
{
};

struct data_packet
{
};

struct connection_handle
{
    void send_data(data_packet const &)
    {
    }
    data_packet receive_data()
    {
        return data_packet();
    }
};

struct remote_connection_manager
{
    connection_handle open(connection_info const &)
    {
        return connection_handle();
    }
} connection_manager;

class X
{
private:
    connection_info connection_details;
    connection_handle connection;
    std::once_flag connection_init_flag;

    void open_connection()
    {
        connection = connection_manager.open(connection_details);
    }

public:
    X(connection_info const &connection_details_) : connection_details(connection_details_)
    {
    }
    void send_data(data_packet const &data)
    {
        std::call_once(connection_init_flag, &X::open_connection, this);
        connection.send_data(data);
    }
    data_packet receive_data()
    {
        std::call_once(connection_init_flag, &X::open_connection, this);
        return connection.receive_data();
    }
};

int main()
{
    X x(connection_info{});
    x.send_data(data_packet{});
    auto data = x.receive_data();
    
    return 0;
}

代替std::call_once()

class my_class{
};
my_class& get_my_class_instance(){
	static my_class instance;
	return instance;
}

3.3.2 保护甚少更新的数据结构(读写锁)

C++11无,C++14增加std::shared_timed_mutex,C++17增加std::shared_mutex。
读锁(共享锁)std::shared_lockstd::shared_mutex
写锁(排他锁)std::lock_guardstd::shared_mutex和std::unique_guardstd::shared_mutex

3.13.cpp

#include <map>
#include <string>
#include <mutex>
#include <shared_mutex>

class dns_entry
{
};

class dns_cache
{
    std::map<std::string, dns_entry> entries;
    std::shared_mutex entry_mutex;

public:
    dns_entry find_entry(std::string const &domain)
    {
        std::shared_lock<std::shared_mutex> lk(entry_mutex);
        std::map<std::string, dns_entry>::const_iterator const it =  entries.find(domain);
        return (it == entries.end()) ? dns_entry() : it->second;
    }
    
    void update_or_add_entry(std::string const &domain, dns_entry const &dns_details)
    {
        std::lock_guard<std::shared_mutex> lk(entry_mutex);
        entries[domain] = dns_details;
    }
};

int main()
{
    return 0;
}

3.3.3 递归加锁

互斥的同一实例多次加锁,一般是设计需要修改。

std::lock_guard<std::recursive_mutex>