[IPC基础]03-通过共享内存和互斥锁、条件变量实现进程同步

时间:2023-01-13 14:57:36

目的:
使用 pthread_mutex_t 和 pthread_cond_t 以及共享内存、内存映射 达到跨进程通信的目的

步骤:

  1. 通过 shm_open 、ftrancate、mmap将共享内存进行内存映射
  2. 将 pthread_mutex_t 和 pthread_cond_t 保存在共享内存中,并且设置 PTHREAD_SHARED 属性,以达到跨进程使用的目的
  3. 消费者在没有通知的情况下超时等待10s后输出共享内存数据
  4. 生产者产生数据后,可以选择是否进行 notify

结论:

  1. 通过设置 PTHREAD_SHARED 属性, pthread_mutex_t 和 pthread_cond_t 可以跨进程使用
  2. 如果生产者不进行notify,消费者将等到超时之后才能被唤醒
  3. 由于消费者进行wait的时候,并不拥有锁,因此如果在有一个进程/线程进行wait的时候,另外一个进程是可以获取锁并且进行条件唤醒的。此时,wait 的线程/进程将被唤醒,但是在 notify 到 wait 之间的时间间隔中,其他线程/进程也可以获取这个锁。因此,如果我们将 wait 和 nofity 方在独立进程的循环中,此时可以看到 notify 和 wait 并不是一对一交替执行的
  4. (补充)mmap 返回的指针,是可以被有亲缘关系的进程使用的
  5. (补充)对相同名字的共享内存文件进行内存映射,跨进程得到的内存地址也是相同的
#include <sys/time.h>
#include <sys/types.h>
#include <sys/shm.h>
#include <stdlib.h>
#include <stdio.h>
#include "sys.h"
#include <unistd.h>

#include <sys/mman.h> // for shm_open
#include <sys/stat.h> /* For mode constants */
#include <fcntl.h> /* For O_* constants */
#include <pthread.h> // for pthread_xx
#include <assert.h> // for assert

struct SHM_MUTEX
{
pthread_mutex_t _mutex;
pthread_cond_t _cond;
};


class SHM_SYNC_COND
{
public:
SHM_SYNC_COND() = default;
bool init(const char *shm_name,size_t elm_size,size_t eml_count)
{
assert(_data == nullptr);
assert(_shm_mutex == nullptr);
size_t s = sizeof(SHM_MUTEX) + elm_size * eml_count;// 计算内存大小
int shm_fd = shm_open(shm_name,O_CREAT | O_RDWR,0666);// 创建/打开共享内存文件
exit_on_error(shm_fd < 0,"shm_open failed!");


int ret = ftruncate(shm_fd,s); // 截断共享文件大小
exit_on_error(ret < 0,"ftruncate failed!");


void *addr = mmap(NULL,s,PROT_WRITE | PROT_READ,MAP_SHARED,shm_fd,0);// 将共享内存文件进行内存映射
exit_on_error(addr == (void *)-1,"mmap failed");


_shm_mutex = (SHM_MUTEX *)addr;// 获取共享内存锁
_data = (char *)addr + sizeof(SHM_MUTEX);


pthread_mutexattr_t mutexattr;// 设置 mutex 的 PTHREAD_PROCESS_SHARED 属性
pthread_mutexattr_setpshared(&mutexattr,PTHREAD_PROCESS_SHARED);
pthread_mutex_init(&_shm_mutex->_mutex,&mutexattr);
pthread_mutexattr_destroy(&mutexattr);



pthread_condattr_t condattr;// 设置 cond 的 PTHREAD_PROCESS_SHARED 属性
pthread_condattr_setpshared(&condattr,PTHREAD_PROCESS_SHARED);
pthread_cond_init(&_shm_mutex->_cond,&condattr);
pthread_condattr_destroy(&condattr);
return true;
}


void notify()// 跨进程进行条件变量通知
{
assert(_data != nullptr);
assert(_shm_mutex != nullptr);
pthread_mutex_lock(&_shm_mutex->_mutex);
printf("notify..\r\n");
pthread_cond_broadcast(&_shm_mutex->_cond);
pthread_mutex_unlock(&_shm_mutex->_mutex);
}


void wait(int32_t wait_ms)// 跨进程进行条件变量等待
{
assert(_data != nullptr);
assert(_shm_mutex != nullptr);



struct timeval now;
struct timespec abstime;
gettimeofday(&now,NULL);
printf("wait sec:%ld,nsec:%ld,wait_ms:%d\r\n",now.tv_sec,now.tv_usec*1000,wait_ms);
abstime.tv_nsec = (now.tv_usec + wait_ms) * 1000;
abstime.tv_sec = now.tv_sec + (abstime.tv_nsec) / 1000000000;
abstime.tv_nsec = abstime.tv_nsec % 1000000000;


pthread_mutex_lock(&_shm_mutex->_mutex);
printf("wait sec:%ld,nsec:%ld\r\n",abstime.tv_sec,abstime.tv_nsec);
//pthread_cond_wait(&_shm_mutex->_cond,&_shm_mutex->_mutex);
pthread_cond_timedwait(&_shm_mutex->_cond,&_shm_mutex->_mutex,&abstime);
pthread_mutex_unlock(&_shm_mutex->_mutex);
}


void *data_buf() const
{
assert(_data != nullptr);
return _data;
}


private:
SHM_MUTEX * _shm_mutex{nullptr};
void *_data{nullptr};
};


struct IPC_DATA // 跨进程通信时使用的结构体
{
pid_t _pid{0};
char _msg[256];
};



int main()
{
const char *shm_name = "shm_cond_name"; // 共享内存文件名
SHM_SYNC_COND shm_cond;
size_t elm_size = sizeof(IPC_DATA);
size_t elm_count = 10;
shm_cond.init(shm_name, elm_size,elm_count);// 创建共享锁以及内存映射
IPC_DATA *datas = (IPC_DATA *)shm_cond.data_buf();// 获取开辟的内存映射数据地址


pid_t pid = fork();// 创建子进程
if (pid == -1)
{
exit_on_error(true,"fork failed");
}


if (pid == 0) // 子进程,向 IPC 数据写入数据,然后进行 notify
{
usleep(100);
printf("child to write msg\r\n");
for (int i = 0; i < elm_count;++i)
{
datas[i]._pid = pid;
snprintf(datas[i]._msg,sizeof(datas[i]._msg),"msg from child,pid:%d,index:%d\r\n",getpid(),i);
}


shm_cond.notify();
printf("child finished!\r\n");
usleep(10);
return 0;
}
else // 父进程,等待子进程进行条件变量通知,或者等待10s超时,最后输出 IPC_DATA 内容
{
int index = 0;
printf("parent wait for msg\r\n");
shm_cond.wait(1000*1000*10); // 如果没有生产者进行notify,则需要等待10s超时,此处在10s内就已经被唤醒,说明条件变量跨进程生效
while (index < elm_count)
{
printf("wait for index:%d!msg:%s",index,datas[index++]._msg);
}


printf("parent to exit!\r\n");
usleep(100);
return 0;
}
return 0;
}


// 输出:
// parent wait for msg
// wait sec:1673590541,nsec:138637000,wait_ms:10000000
// wait sec:1673590551,nsec:138637000
// child to write msg
// notify..
// child finished!
// wait for index:1!msg:msg from child,pid:7529,index:0
// wait for index:2!msg:msg from child,pid:7529,index:1
// wait for index:3!msg:msg from child,pid:7529,index:2
// wait for index:4!msg:msg from child,pid:7529,index:3
// wait for index:5!msg:msg from child,pid:7529,index:4
// wait for index:6!msg:msg from child,pid:7529,index:5
// wait for index:7!msg:msg from child,pid:7529,index:6
// wait for index:8!msg:msg from child,pid:7529,index:7
// wait for index:9!msg:msg from child,pid:7529,index:8
// wait for index:10!msg:msg from child,pid:7529,index:9
// parent to exit!