Boost学习笔记 -- thread

时间:2022-12-27 08:02:13

thread库为C++增加了线程处理的能力,提供了简明清晰的线程、互斥量等概念。

使用thread库

#include <boost/thread>
using namespace boost;

链接时,需要使用-lpthread选项来链接POSIX线程库。

时间功能

在多线程编程时,经常要用到超时处理,thread库V3使用了chrono库提供对时间的支持。
chrono库的时间概念与date_time库类似,在名称空间boost::chrono里提供了时分秒等概念,可以使用microseconds、milliseconds、seconds、minutes、hours等时间长度类表示超时的时间,用time_point表示某个确定时间点。

this_thread::sleep_for( chrono::seconds( 2 ) );
this_thread::sleep_until( chrono::system_clock::now() + chrono::seconds( 1 ) );

互斥量

mutex对象在创建后就表示了一个互斥量,成员lock()用于线程阻塞等待直至获得互斥量的所有权;当线程使用完共享资源后应该及时使用unlock()解除对互斥量的锁定

mutex mu;
try {
mu.lock();
cout << "some operations" << endl;
mu.unlock();
} catch( ... ) {
mu.unlock();
}

直接使用mutex的成员函数来进行锁定互斥量不够方便,而且在发生异常导致退出作用域等情况下很可能会忘记解除锁定;因此thread库又提供了一系列RAII型的lock_gurad类,辅助锁定互斥量。它们在构造时锁定互斥量,在析构时自动解锁。

mutex mu;
mutex::scoped_lock lock(mu);
cout << "some operations" << end;

线程对象

注意:thread对象是不可拷贝的。

  • 创建线程
    线程可以认为是在进程的另一个空间里运行的一个函数;在传递参数时需要注意,thread使用的是参数的拷贝,因此要求可调用物和参数类型都支持拷贝构造。如果希望传递给线程引用值就需要使用ref库进行包装,同时必须保证被引用的对象在线程执行期间一直存在。

  • 启动线程
    当成功创建了一个thread对象后,线程就立刻开始执行,thread不提供类似start(),begin()那样的方法。

mutex io_mu;

void printing( atomic_int& x, const string& str ) {
for( int i = 0; i < 5; ++i ) {
mutex::scoped_lock lock( io_mu );
cout << str << ++x << endl;
}
}

int main() {
atomic_int x;

thread( printing, ref( x ), "hello" );
thread( printing, ref( x ), "boost" );
this_thread::sleep_for( chrono::seconds( 2 ) );
}
  • join()
    thread的成员函数joinable()可以判断thread对象是否标识了一个可执行的线程体,如果joinable()返回true,我们就可以调用成员函数join()或者try_join_for()/try_join_until()来阻塞当前线程:
    join()一直阻塞等待,直到线程结束;
    try_join_for()/try_join_until()阻塞线程,或者阻塞等待一定的时间,然后不管线程是否结束都返回。

  • detach()
    可以使用detach()将thread与线程执行体手动分离,此后thread对象不代表任何线程体,失去对线程体的控制。

thread t1( printing, ref( x ), "hello" );
t1.detach();
assert( t1.get_id() == thread::id() ); // t1不再标识任何线程
  • bind()和function
    有时在thread的构造函数中写传递给调用函数的参数很麻烦,尤其是在使用大量线程对象的时候,这时可以使用Boost的bind和function库。
thread t3( bind( printing, ref( x ), "thread" );
function<void>()> f = bind( printing, ref( x ), "mutex" );
thread( f );
  • other functions
    get_id():返回线程id对象;
    native_handle():获取线程handle的函数,返回类型是系统平台相关的;
    yield():指示当前线程放弃时间片,允许其他的线程运行;
    sleep_for()/sleep_until():让线程睡眠等待一小段时间;

  • 线程中断
    interrupt():要求线程中断执行;
    interruption_requested():检查县城是否被要求中断;
    被中断的线程会抛出一个thread_interrupted异常,这个异常应该在线程执行函数里捕获并处理,如果线程不处理这个异常,那么默认的动作是终止线程。

void to_interrupt( atomic_int& x, const string& str ) {
try {
for( int i = 0; i < 5; ++i ) {
this_thread::sleep_for( chrono::seconds( 1 ) );
mutex::scoped_lock lock( io_mu );
cout << str << ++x << endl;
}
} catch( thread_interrupted& ) {
cout << "thread_interrupted" << endl;
}
}

int main() {
atomic_int x;

thread t( to_interrupt, ref( x ), "hello" );
this_thread::sleep_for( chrono::seconds( 2 ) );

t.interrupt();
assert( t.interruption_requested() );

t.join();
}

程序运行的结果可能如下:
hello1
hello2
thread_interrupted

线程组

thread库提供类thread_group用于管理一组线程。
成员函数create_thread()是一个工厂函数,可以创建thread对象并运行线程;也可以在外部创建线程,使用add_thread()加入到线程组;如果不需要某个线程,使用remove_thread()进行删除。
join_all()和interrupt_all()用来对所有线程进行等待或者中断。

thread_group tg;
tg.create_thread( bind( printing, ref( x ), "C++" );
tg.create_thread( bind( printing, ref( x ), "boost" );
tg.join_all();

使用thread_group,我们可以为程序建立一个类似于全局的线程池

typedef singleton_default<thread_group> thread_pool;
thread_pool::instance().create_thread( ... );

条件变量

条件变量是thread库提供的另一种用于等待的同步机制,可以实现线程间的通信;必须要与互斥量配合使用;等待另一个线程中的某个事件发生,然后线程才能继续执行。

  • condition_variable
  • condition_variable_any

使用方法:
1. 拥有条件变量的线程先锁定互斥量,然后循环检查某个条件,如果条件不满足,那么就调用条件变量的成员函数wait()等待直至条件满足;
2. 其他线程处理条件变量要求的条件,当条件满足时调用它的成员函数notify_one()或notify_all(),以通知一个或者所有正在等待条件变量的线程继续执行。

#include <stack>
class buffer {
private:
mutex mu;
condition_variable_any cond_ put;
condition_variable_any cond_get;
stack<int> stk;
int un_read, capacity;

bool is_full() {
return un_read == capacity;
}
bool is_empty() {
return un_read == 0;
}

public:
buffer( size_t n ):un_read( 0 ), capacity( n ) {}

void put( int x ) {
{
mutex::scoped_lock lock( mu );
while( is_full() ) {
{
mutex::scoped_lock lock( io_mu );
cout << "full waiting..." << endl;
}
cond_put.wait( mu );
}
stk.push( x );
++un_read;
}
cond_get.notify_one();
}

void get( int* x ) {
{
mutex::scoped_lock lock( mu );
while( is_empty() ) {
{
mutex::scoped_lock lock( io_mu );
cout << "empty waiting..." << endl;
}
cond_get.wait( mu );
}
--un_read;
*x = stk.pop();
}
cond_put.notify_one();
}
}
buffer buf( 5 );
void producer( int ) {
for( int i = 0; i < n; ++i ) {
{
mutex::scoped_lock lock( io_mu );
cout << "put " << i << endl;
}
buf.put( i );
}
}

void consumer( int n ) {
int x;
for( int i = 0; i < n; ++ i ) {
buf.get( &x );
mutex::scoped_lock lock( io_mu );
cout << "get " << x << endl;
}
}
int main() {
thread t1( producer, 20 );
thread t2( consumer, 10 );
thread t3( consumer, 10 );

t1.join();
t2.join();
t3.join();
}

共享互斥量

共享互斥量shared_mutex允许线程获取多个共享所有权和一个专享所有权,实现了读写锁的机制,即多个读线程和一个写线程。
shared_mutex读锁定时使用shared_lock<shared_mutex>,写锁定时使用unique_lock<shared_mutex>

class rw_data {
private:
int m_x;
shared_mutex rw_mu;

public:
rw_data():m_x( 0 ) {}
void write() {
unique_lock<shared_mutex> ul( rw_mu );
++m_x;
}
void read( int* x ) {
shared_lock<shared_mutex> sl( rw_mu );
*x = m_x;
}
}
void writer( rw_data& d ) {
for( int i = 0; i < 20; ++i ) {
this_thread::sleep_for( chrono::milliseconds( 10 ) );
d.write();
}
}
void reader( rw_data& d ) {
int x;
for( int i = 0; i < 10; ++i) {
this_thread::sleep_for( chrono::milliseconds( 5 ) );
d.read( &x );
mutex::scoped_lock lock( io_mu );
cout << "reader: " << x << endl;
}
}
int main() {
rw_data d;
thread_group pool;

pool.create_thread( bind( writer, ref( d ) ) );
pool.create_thread( bind( writer, ref( d ) ) );

pool.create_thread( bind( reader, ref( d ) ) );
pool.create_thread( bind( reader, ref( d ) ) );
pool.create_thread( bind( reader, ref( d ) ) );
pool.create_thread( bind( reader, ref( d ) ) );

pool.join_all();
}

future

很多情况下线程不仅仅要执行一些工作,它还可能要返回一些计算结果,thread库使用future范式提供了一种异步操作线程返回值的方法。

int fab( int n ) {
if( n == 0 || n == 1 ) {
return 1;
}
return fab( n - 1 ) + fab( n - 2 );
}

int main() {
// package_task只接受无参函数,因此需要使用bind
packaged_task<int> pt( bind( fab, 10 ) );

future<int> f = pt.get_future();

// 启动线程计算,必须使用boost::move()来转移packaged_task对象
// 因为packaged_task是不可拷贝的
thread( boost::move( pt ) );
f.wait();

assert( f.is_ready() && f.has_value() );
cout << f.get();
}