多线程、多进程TCP服务器、进程池和线程池

时间:2022-07-20 10:15:27

一、多线程TCP服务器
代码如下:

#include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<string.h>
#include<stdlib.h>
#include<pthread.h>
#include<unistd.h>
int startup(char* _ip, int _port)
{
//创建套接字
int sock = socket(AF_INET,SOCK_STREAM,0);
if(sock < 0){
perror("socket");
return 2;
}

int flag = 1;
setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&flag,sizeof(flag));
//绑定套接字
struct sockaddr_in local;
local.sin_family = AF_INET;
local.sin_port = htons(_port);
local.sin_addr.s_addr = inet_addr(_ip);
if(bind(sock,(struct sockaddr*)&local,sizeof(local)) < 0){
perror("bind");
return 3;
}

//listen socket
if(listen(sock,10)<0){
perror("listen");
return 4;
}
return sock;
}
void* handler(void* _sock)
{
int sock = *(int*)_sock;
while(1){
char buf[1024];
int _r = read(sock,buf,sizeof(buf));
if(_r > 0){
buf[_r] = 0;
printf("%s\n",buf);
char* msg = "HTTP/ 1.0 200 ok\r\n<html><h1>hello bit</h1></html>\r\n";
write(sock,msg,strlen(msg));
break;
}
else if(_r == 0){
printf("client quit\n");
break;
}
else{
perror("read");
break;
}
}
return NULL;
}
int main(int argc,char* argv[])
{
if(argc < 3){
printf("%s [_ip] [_port]\n",argv[0]);
return 1;
}

int listen_sock = startup(argv[1],atoi(argv[0]));

while(1){
struct sockaddr_in server;
socklen_t len = sizeof(server);
printf("hehe2\n");
int new_sock = accept(listen_sock,(struct sockaddr*)&server,&len);
if(new_sock < 0){
perror("accept");
continue;
}
printf("hehe1\n");
pthread_t new_pthread = 0;
pthread_create(&new_pthread,NULL,handler,(void*)&new_sock);
pthread_detach(new_pthread);
}

return 0;

二、多进程TCP服务器
代码如下:

#include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<string.h>
#include<stdlib.h>
#include<unistd.h>
int startup(char* _ip, int _port)
{
//创建套接字
int sock = socket(AF_INET,SOCK_STREAM,0);
if(sock < 0){
perror("socket");
return 2;
}

int flag = 1;
setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&flag,sizeof(flag));
//绑定套接字
struct sockaddr_in local;
local.sin_family = AF_INET;
local.sin_port = htons(_port);
local.sin_addr.s_addr = inet_addr(_ip);
if(bind(sock,(struct sockaddr*)&local,sizeof(local)) < 0){
perror("bind");
return 3;
}

//listen socket
if(listen(sock,10)<0){
perror("listen");
return 4;
}
return sock;
}

int main(int argc,char* argv[])
{
if(argc < 3){
printf("%s [_ip] [_port]\n",argv[0]);
return 1;
}

int listen_sock = startup(argv[1],atoi(argv[0]));

while(1){
struct sockaddr_in server;
socklen_t len = sizeof(server);
printf("hehe2\n");
int new_sock = accept(listen_sock,(struct sockaddr*)&server,&len);
if(new_sock < 0){
perror("accept");
continue;
}

pid_t _f = fock();
if(_f == 0){ //child
if(fock()>0){
exit(2);
}
//处理相关事情
}
else if(_f > 0){//father

}
else {
perror("fock");
return 1;
}
}

return 0;
}

三、进程池和线程池

池的概念

由于服务器的硬件资源“充裕”,那么提高服务器性能的一个很直接的方法就是以空间换时间,即“浪费”服务器的硬件资源,以换取其运行效率。这就是池的概念。池是一组资源的集合,这组资源在服务器启动之初就完全被创建并初始化,这称为静态资源分配。当服务器进入正是运行阶段,即开始处理客户请求的时候,如果它需要相关的资源,就可以直接从池中获取,无需动态分配。很显然,直接从池中取得所需资源比动态分配资源的速度要快得多,因为分配系统资源的系统调用都是很耗时的。当服务器处理完一个客户连接后,可以把相关的资源放回池中,无需执行系统调用来释放资源。从最终效果来看,池相当于服务器管理系统资源的应用设施,它避免了服务器对内核的频繁访问。

池可以分为多种,常见的有内存池、进程池、线程池和连接池。
进程池和线程池概述

进程池和线程池相似,所以这里我们以进程池为例进行介绍。如没有特殊声明,下面对进程池的讨论完全是用于线程池。

进程池是由服务器预先创建的一组子进程,这些子进程的数目在 3~10 个之间(当然这只是典型情况)。线程池中的线程数量应该和 CPU 数量差不多。

进程池中的所有子进程都运行着相同的代码,并具有相同的属性,比如优先级、 PGID 等。

当有新的任务来到时,主进程将通过某种方式选择进程池中的某一个子进程来为之服务。相比于动态创建子进程,选择一个已经存在的子进程的代价显得小得多。至于主进程选择哪个子进程来为新任务服务,则有两种方法:

主进程使用某种算法来主动选择子进程。最简单、最常用的算法是随机算法和 Round Robin (轮流算法)。

主进程和所有子进程通过一个共享的工作队列来同步,子进程都睡眠在该工作队列上。当有新的任务到来时,主进程将任务添加到工作队列中。这将唤醒正在等待任务的子进程,不过只有一个子进程将获得新任务的“接管权”,它可以从工作队列中取出任务并执行之,而其他子进程将继续睡眠在工作队列上。

当选择好子进程后,主进程还需要使用某种通知机制来告诉目标子进程有新任务需要处理,并传递必要的数据。最简单的方式是,在父进程和子进程之间预先建立好一条管道,然后通过管道来实现所有的进程间通信。在父线程和子线程之间传递数据就要简单得多,因为我们可以把这些数据定义为全局,那么它们本身就是被所有线程共享的。

多线程、多进程TCP服务器、进程池和线程池
代码:


//主接收网络消息进程
int main()
{
int iReLen;
char szBuffer[64];
...
//初始化进程池
InitProcessPoll();
....
while(1)
{
iRelen = recv(iSock, szBuffer, sizeof(szbuffer), 0);
if (iRelen > 0)
{
SubmitWork(szBuffer, iRelen); //提交工作到资源进程
}
}
...
}

//
int SubmitWork(void *pBuffer, int iMsgLen)
{
int iMsgQueID;

iMsgQueID = GetMsgQue(key, 0); //取得跟管理进程通信的消息队更句柄;

msgsnd(iMsgQueID, pBuffer, iMsgLen, 0);
}

int InitProcessPoll(const int iProcessNum)
{
int iPid;

//创建管理进程
iPid = fork();
if (iPid == 0)
{
InitMngProcess(iProcessNum);
}
return 0;
}

typedef struct
{
int pid;
int iFlag;
} T_ProcessStatus;

//指向资源进程管理结构
T_ProcessStatus *pProcessMng = NULL

//记录有总共有多少个资源进程
INT32 iMaxProcessNum = 0;

//初始管理进程管理结构并创建资源子进程,最后接收外部工作请求并分发到资源进行进行处理
InitMngProcess(const int iProcessNum)
{
int i;
int iPid;
int iRtn;
int iMsgLen;
int iMsgQue1, iMsgQue2;
char szBuffer[64];

//创建管理进程结构
pProcessMng = calloc(iProcessNum, sizeof(T_ProcessStatus))

for (i = 0; i < iProcessNum; i++)
{
iPid = fork();
if (iPid == 0);
{
//资源进程;
ResourceProcess();
}
pProcessMng[i].pid = iPid; //记录资源进程的进程号
pProcessMng[i].iFlag = 0; //把资源进程置为空闲
}

iMaxProcessNum = iProcessNum;

//创建外部跟管理进程通信的消息队列;
iMsgQue1 = CreateMsgQue();

//创建管理进程跟资源进程通信的消息队列;
iMsgQue2 = CreateMsgQue();

//安装资源进程回收信号处理函数
signal(SIGUSR1, ReleaseAProcess);

//开始接收外部传入的任务
while(1)
{
//接收外部的工作请求
iMsgLen = msgrcv(iMsgQue1, szBuffer, sizeof(szBuffer), 0);

//转发工作请求到资源进程处理消息队列
iRtn = msgsnd(iMsgQue2, szBuffer, iMsgLen, 0);

//通知其中的一个空闲资源进程进行处理
NoticeAIdleProcess();
}
}

//通知一个空闲资源进程进行处理
int NoticeAIdleProcess()
{
int i;

for (i = 0; i < iMaxProcessNum; i++)
{
if (pProcessMng[i].iFlag == 0)
{
pProessMng[i].Flag = 1;
kill(processMng[i].pid, SIGUSR1);
return 0;
}
}

return -1;
}

//回收一个处理完成的资源进程
void ReleaseAProcess(int iPid)
{
int i;

for (i = 0; i < iMaxProcessNum; i++)
{
if (pProessMng[i].pid == iPid)
{
pProcessMng[i].iFlag = 0;
return;
}
}

return;
}

//资源进程的处理
void ResourceProcess()
{
//安装有工作通知信号处理
signal(SIGUSR1, SIG_IGN);

//设置只对SIGUSR1信号感兴趣
sigprocmask()
while(1)
{
//没有消息处理时挂起
pause();

//处理工作
void StartWork()

//通知管理进程工作处理完成
NoticeMngProcessFinishedWork();
}
}

//处理消息
void StartWork()
{
char szBuffer[64];
int iMsgID;
int iRtn;

iMsgID = msgget();
iRtn = msgrcv(iMsgID, szBuffer, sizeof(szBuffer), 0);

//现在在子进程里取得了需要处理的消息,可以开始处理该消息直到消息处理完成

return;
}

//通知管理进程回收资源进程
void NoticeMngProcessFinishedWork();
{
kill(MngProcessPid, SIGUSR1);

return;
}

[cpp] view plain copy

//主接收网络消息进程
int main()
{
int iReLen;
char szBuffer[64];
...
//初始化进程池
InitProcessPoll();
....
while(1)
{
iRelen = recv(iSock, szBuffer, sizeof(szbuffer), 0);
if (iRelen > 0)
{
SubmitWork(szBuffer, iRelen); //提交工作到资源进程
}
}
...
}

//
int SubmitWork(void *pBuffer, int iMsgLen)
{
int iMsgQueID;

iMsgQueID = GetMsgQue(key, 0); //取得跟管理进程通信的消息队更句柄;

msgsnd(iMsgQueID, pBuffer, iMsgLen, 0);
}

int InitProcessPoll(const int iProcessNum)
{
int iPid;

//创建管理进程
iPid = fork();
if (iPid == 0)
{
InitMngProcess(iProcessNum);
}
return 0;
}

typedef struct
{
int pid;
int iFlag;
} T_ProcessStatus;

//指向资源进程管理结构
T_ProcessStatus *pProcessMng = NULL

//记录有总共有多少个资源进程
INT32 iMaxProcessNum = 0;

//初始管理进程管理结构并创建资源子进程,最后接收外部工作请求并分发到资源进行进行处理
InitMngProcess(const int iProcessNum)
{
int i;
int iPid;
int iRtn;
int iMsgLen;
int iMsgQue1, iMsgQue2;
char szBuffer[64];

//创建管理进程结构
pProcessMng = calloc(iProcessNum, sizeof(T_ProcessStatus))

for (i = 0; i < iProcessNum; i++)
{
iPid = fork();
if (iPid == 0);
{
//资源进程;
ResourceProcess();
}
pProcessMng[i].pid = iPid; //记录资源进程的进程号
pProcessMng[i].iFlag = 0; //把资源进程置为空闲
}

iMaxProcessNum = iProcessNum;

//创建外部跟管理进程通信的消息队列;
iMsgQue1 = CreateMsgQue();

//创建管理进程跟资源进程通信的消息队列;
iMsgQue2 = CreateMsgQue();

//安装资源进程回收信号处理函数
signal(SIGUSR1, ReleaseAProcess);

//开始接收外部传入的任务
while(1)
{
//接收外部的工作请求
iMsgLen = msgrcv(iMsgQue1, szBuffer, sizeof(szBuffer), 0);

//转发工作请求到资源进程处理消息队列
iRtn = msgsnd(iMsgQue2, szBuffer, iMsgLen, 0);

//通知其中的一个空闲资源进程进行处理
NoticeAIdleProcess();
}
}

//通知一个空闲资源进程进行处理
int NoticeAIdleProcess()
{
int i;

for (i = 0; i < iMaxProcessNum; i++)
{
if (pProcessMng[i].iFlag == 0)
{
pProessMng[i].Flag = 1;
kill(processMng[i].pid, SIGUSR1);
return 0;
}
}

return -1;
}

//回收一个处理完成的资源进程
void ReleaseAProcess(int iPid)
{
int i;

for (i = 0; i < iMaxProcessNum; i++)
{
if (pProessMng[i].pid == iPid)
{
pProcessMng[i].iFlag = 0;
return;
}
}

return;
}

//资源进程的处理
void ResourceProcess()
{
//安装有工作通知信号处理
signal(SIGUSR1, SIG_IGN);

//设置只对SIGUSR1信号感兴趣
sigprocmask()
while(1)
{
//没有消息处理时挂起
pause();

//处理工作
void StartWork()

//通知管理进程工作处理完成
NoticeMngProcessFinishedWork();
}
}

//处理消息
void StartWork()
{
char szBuffer[64];
int iMsgID;
int iRtn;

iMsgID = msgget();
iRtn = msgrcv(iMsgID, szBuffer, sizeof(szBuffer), 0);

//现在在子进程里取得了需要处理的消息,可以开始处理该消息直到消息处理完成

return;
}

//通知管理进程回收资源进程
void NoticeMngProcessFinishedWork();
{
kill(MngProcessPid, SIGUSR1);

return;
}