Linux 并发服务器雏形总结

时间:2023-03-10 02:59:28
Linux 并发服务器雏形总结

如下介绍一个并发回射客户端/服务器的雏形,所谓回射:就是客户端输入一条数据,服务器端读取并显示,然后服务器端再把刚读取的信息发送回客户端进行显示。示意图如下:

Linux 并发服务器雏形总结

所谓并发服务器:就是一个服务器可以同时为多个连入的客户端提供服务,示意图如下:

Linux 并发服务器雏形总结

如下主要介绍两种实现并发回射服务器的方式,一种是通过子进程方式实现并发,一种是通过I/O多路转接实现并发。

并发服务器(1)[子进程方式]

 [root@benxintuzi tcp]# cat echoserv_childprocess.c
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <stdio.h> #define ERR_EXIT(message) \
do \
{ \
perror(message); \
exit(EXIT_FAILURE); \
} while() /* 回射服务 */
void echoserv(int conn)
{
char recvbuf[];
while ()
{
memset(recvbuf, , sizeof(recvbuf));
int ret;
if ((ret = read(conn, recvbuf, sizeof(recvbuf))) == -)
ERR_EXIT("read");
if (ret == ) /* client closed */
{
printf("client closed.\n");
break;
} fputs(recvbuf, stdout);
if (write(conn, recvbuf, ret) != ret)
ERR_EXIT("write");
}
exit(EXIT_SUCCESS);
} int main(void)
{
/* 创建一个监听套接字 */
int listen_fd;
if ((listen_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -)
ERR_EXIT("socket"); /* 初始化服务器地址 */
struct sockaddr_in serv_addr;
memset(&serv_addr, , sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons();
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
/**
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
inet_aton("127.0.0.0", &serv_addr.sin_addr); */ /* 设置地址重复利用,使得服务器不必等待TIME_WAIT状态消失就可以重启 */
int on = ;
if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -)
ERR_EXIT("setsockopt"); /* 将服务器地址绑定到监听套接字上 */
if (bind(listen_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -)
ERR_EXIT("bind"); /* 监听进入的连接 */
if (listen(listen_fd, SOMAXCONN) == -)
ERR_EXIT("listen"); /* 初始化一个客户端地址用于保存接入的客户端 */
struct sockaddr_in clit_addr;
memset(&clit_addr, , sizeof(clit_addr));
socklen_t clit_len = sizeof(clit_addr); pid_t pid;
int conn;
while ()
{
/* 从已连接队列(保存已完成三次握手的连接)中返回第一个连接 */
/* 将返回的客户端连接保存在clit_addr中 */
if ((conn = accept(listen_fd, (struct sockaddr*)&clit_addr, &clit_len)) == -)
ERR_EXIT("accept");
printf("client(ip = %s, port = %d) connected.\n", inet_ntoa(clit_addr.sin_addr), ntohs(clit_addr.sin_port)); /* 创建子进程用于回射服务 */
if (( pid = fork()) == -)
ERR_EXIT("fork");
if (pid == ) /* 子进程,每接入一个客户端,就创建一个子进程进行回射服务,这样就可以实现并发处理了 */
{
/* 子进程只负责回射服务,不负责连接客户端,因此需要关闭监听套接字 */
close(listen_fd); /* 进行回射服务 */
echoserv(conn);
}
else /* 父进程 */
close(conn);
} return ;
}

并发客户端(1)

 [root@benxintuzi tcp]# cat echoclit.c
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <stdio.h> #define ERR_EXIT(message) \
do \
{ \
perror(message); \
exit(EXIT_FAILURE); \
} while () void echoclit(int sock_fd)
{
/* 创建一个发送缓冲区和一个接收缓冲区 */
char sendbuf[], recvbuf[];
/* 从标准输入读取数据,存入发送缓冲区 */
while (fgets(sendbuf, sizeof(sendbuf), stdin) != NULL)
{
/* 将发送缓冲区的数据写到套接字指定的服务器 */
write(sock_fd, sendbuf, sizeof(sendbuf));
/* 将服务器返回的数据存入接收缓冲区 */
read(sock_fd, recvbuf, sizeof(recvbuf));
/* 将接收缓冲区的数据打印到标准输出 */
fputs(recvbuf, stdout);
/* 清空数据缓冲区 */
memset(sendbuf, , sizeof(sendbuf));
memset(recvbuf, , sizeof(recvbuf));
}
} int main(void)
{
/* 创建连接套接字 */
int sock_fd;
if ((sock_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -)
ERR_EXIT("socket"); /* 初始化要连接的服务器地址 */
struct sockaddr_in serv_addr;
memset(&serv_addr, , sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons();
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); /* 将套接字连接至指定服务器 */
if (connect(sock_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -)
ERR_EXIT("connect"); echoclit(sock_fd);
close(sock_fd); return ;
}

(UDPSOCK_SEQPACKET套接字提供了基于报文的服务,这意味着接收的数据量和发送的数据量完全一致,因此应用程序可以很容易区分出报文的边界。而TCPSOCK_STREAM套接字提供了字节流服务,应用程序不能分辨出报文的边界,这样就很容易导致粘包问题。具体解决方案主要是靠应用层维护消息与消息之间的边界。有如下几种:

  1. 发送/接收定长包
  2. 在包尾加上\r\n(如ftp就是这样做的)
  3. 在包头封装数据的长度
  4. 依赖于更复杂的应用层协议

如下为封装好的发送/接收定长数据包的函数:

 /* 接收定长数据包 */
size_t readn(int fd, void* buf, size_t count)
{
/* nleft:剩下未接收的数据量
* nread:每次接收的数据量 */
size_t nleft = count;
ssize_t nread; while (nleft > ) /* 如果还有未接收的数据 */
{
if ((nread = read(fd, buf, nleft)) == -)
{
if (nleft == count) /* 剩下的数据量与初始数据量相等,说明未成功接收任何数据,失败返回 */
return (-);
else /* 本次read操作失败,返回到目前为止成功接收的数据量 */
break;
}
else if (nread == ) /* EOF,说明没有数据可供接收了 */
break;
else /* 接收数据后更新变量 */
{
buf += nread;
nleft -= nread;
}
} return (count - nleft); /* 返回成功接收的数据量 */
} /* 发送定长数据包 */
ssize_t writen(int fd, const void* buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten; while (nleft > )
{
if ((nwritten = write(fd, buf, nleft)) == -)
{
if (nleft == count)
return (-);
else
break;
}
else if (nwritten == )
break;
else
{
buf += nwritten;
nleft -= nwritten;
}
} return (count - nleft); /* 返回成功发送的数据量 */
}

并发服务器(2)[子进程方式][利用发送定长包解决粘包问题]

 [root@benxintuzi tcp]# cat echoserv_childprocess_n.c
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <stdio.h> #define ERR_EXIT(message) \
do \
{ \
perror(message); \
exit(EXIT_FAILURE); \
} while () /* 接收定长数据包 */
size_t readn(int fd, void* buf, size_t count)
{
/* nleft:剩下未接收的数据量
* nread:每次接收的数据量 */
size_t nleft = count;
ssize_t nread; while (nleft > ) /* 如果还有未接收的数据 */
{
if ((nread = read(fd, buf, nleft)) == -)
{
if (nleft == count) /* 剩下的数据量与初始数据量相等,说明未成功接收任何数据,失败返回 */
return (-);
else /* 本次read操作失败,返回到目前为止成功接收的数据量 */
break;
}
else if (nread == ) /* EOF,说明没有数据可供接收了 */
break;
else /* 接收数据后更新变量 */
{
buf += nread;
nleft -= nread;
}
} return (count - nleft); /* 返回成功接收的数据量 */
} /* 发送定长数据包 */
ssize_t writen(int fd, const void* buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten; while (nleft > )
{
if ((nwritten = write(fd, buf, nleft)) == -)
{
if (nleft == count)
return (-);
else
break;
}
else if (nwritten == )
break;
else
{
buf += nwritten;
nleft -= nwritten;
}
} return (count - nleft); /* 返回成功发送的数据量 */
} void echoserv(int conn)
{
char recvbuf[];
while ()
{
memset(recvbuf, , sizeof(recvbuf));
int ret;
if ((ret = readn(conn, recvbuf, sizeof(recvbuf))) == -)
ERR_EXIT("readn"); fputs(recvbuf, stdout);
if (writen(conn, recvbuf, ret) == -)
ERR_EXIT("writen");
} exit(EXIT_SUCCESS);
} int main(void)
{
/* 创建一个监听套接字 */
int listen_fd;
if ((listen_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -)
ERR_EXIT("socket"); /* 初始化服务器地址 */
struct sockaddr_in serv_addr;
memset(&serv_addr, , sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons();
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
/**
* serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
* inet_aton("127.0.0.0", &serv_addr.sin_addr); */ /* 设置地址重复利用,使得服务器不必等待TIME_WAIT状态消失就可以重启 */
int on = ;
if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -)
ERR_EXIT("setsockopt"); /* 将服务器地址绑定到监听套接字上 */
if (bind(listen_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -)
ERR_EXIT("bind"); /* 监听进入的连接 */
if (listen(listen_fd, SOMAXCONN) == -)
ERR_EXIT("listen"); /* 初始化一个客户端地址用于保存接入的客户端 */
struct sockaddr_in clit_addr;
memset(&clit_addr, , sizeof(clit_addr));
socklen_t clit_len = sizeof(clit_addr); pid_t pid;
int conn;
while ()
{
/* 从已连接队列(保存已完成三次握手的连接)中返回第一个连接 */
/* 将返回的客户端连接保存在clit_addr中 */
if ((conn = accept(listen_fd, (struct sockaddr*)&clit_addr, &clit_len)) == -)
ERR_EXIT("accept");
printf("client(ip = %s, port = %d) connected.\n", inet_ntoa(clit_addr.sin_addr), ntohs(clit_addr.sin_port)); /* 创建子进程用于回射服务 */
if (( pid = fork()) == -)
ERR_EXIT("fork");
if (pid == ) /* 子进程,每接入一个客户端,就创建一个子进程进行回射服务,这样就可以实现并发处理了 */
{
/* 子进程只负责回射服务,不负责连接客户端,因此需要关闭监听套接字 */
close(listen_fd); /* 进行回射服务 */
echoserv(conn);
}
else /* 父进程 */
close(conn);
} return ;
}

并发客户端(2)[利用发送定长包解决粘包问题]

 [root@benxintuzi tcp]# cat echoclit_n.c
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <stdio.h> #define ERR_EXIT(message) \
do \
{ \
perror(message); \
exit(EXIT_FAILURE); \
} while() /* 接收定长数据包 */
size_t readn(int fd, void* buf, size_t count)
{
/* nleft:剩下未接收的数据量
* * nread:每次接收的数据量 */
size_t nleft = count;
ssize_t nread; while (nleft > ) /* 如果还有未接收的数据 */
{
if ((nread = read(fd, buf, nleft)) == -)
{
if (nleft == count) /* 剩下的数据量与初始数据量相等,说明未成功接收任何数据,失败返回 */
return (-);
else /* 本次read操作失败,返回到目前为止成功接收的数据量 */
break;
}
else if (nread == ) /* EOF,说明没有数据可供接收了 */
break;
else /* 接收数据后更新变量 */
{
buf += nread;
nleft -= nread;
}
} return (count - nleft); /* 返回成功接收的数据量 */
} /* 发送定长数据包 */
ssize_t writen(int fd, const void* buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten; while (nleft > )
{
if ((nwritten = write(fd, buf, nleft)) == -)
{
if (nleft == count)
return (-);
else
break;
}
else if (nwritten == )
break;
else
{
buf += nwritten;
nleft -= nwritten;
}
} return (count - nleft); /* 返回成功发送的数据量 */
} void echoclit(int sock_fd)
{
/* 创建一个发送缓冲区和一个接收缓冲区 */
char sendbuf[], recvbuf[];
/* 从标准输入读取数据,存入发送缓冲区 */
while (fgets(sendbuf, sizeof(sendbuf), stdin) != NULL)
{
/* 将发送缓冲区的数据写到套接字指定的服务器 */
int ret;
if ((writen(sock_fd, sendbuf, sizeof(sendbuf))) == -)
ERR_EXIT("writen"); /* 将服务器返回的数据存入接收缓冲区 */
if ((readn(sock_fd, recvbuf, sizeof(recvbuf))) == -)
ERR_EXIT("recvbuf"); /* 将接收缓冲区的数据打印到标准输出 */
fputs(recvbuf, stdout);
/* 清空数据缓冲区 */
memset(sendbuf, , sizeof(sendbuf));
memset(recvbuf, , sizeof(recvbuf));
}
} int main(void)
{
/* 创建连接套接字 */
int sock_fd;
if ((sock_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -)
ERR_EXIT("socket"); /* 初始化要连接的服务器地址 */
struct sockaddr_in serv_addr;
memset(&serv_addr, , sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons();
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); /* 将套接字连接至指定服务器 */
if (connect(sock_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -)
ERR_EXIT("connect"); echoclit(sock_fd);
close(sock_fd); return ;
}

如下解决粘包问题采用自定义数据包头,在包头中封装一个数据的长度与存放数据的缓存区:

struct packet      /* 包头 */

{

  int len;      /* 表示数据的长度 */

  char buf[1024];  /* 数据缓存区 */

};

并发服务器(3)[子进程方式][利用自定义包头解决粘包问题]

 [root@benxintuzi tcp]# cat echoserv_childprocess_packet.c
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <stdio.h> #define ERR_EXIT(message) \
do \
{ \
perror(message); \
exit(EXIT_FAILURE); \
} while () /* 自定义包头 */
struct packet
{
int len; /* 数据长度 */
char buf[]; /* 数据缓存区 */
}; /* 接收定长数据包 */
size_t readn(int fd, void* buf, size_t count)
{
/* nleft:剩下未接收的数据量
* nread:每次接收的数据量 */
size_t nleft = count;
ssize_t nread; while (nleft > ) /* 如果还有未接收的数据 */
{
if ((nread = read(fd, buf, nleft)) == -)
{
if (nleft == count) /* 剩下的数据量与初始数据量相等,说明未成功接收任何数据,失败返回 */
return (-);
else /* 本次read操作失败,返回到目前为止成功接收的数据量 */
break;
}
else if (nread == ) /* EOF,说明没有数据可供接收了 */
break;
else /* 接收数据后更新变量 */
{
buf += nread;
nleft -= nread;
}
} return (count - nleft); /* 返回成功接收的数据量 */
} /* 发送定长数据包 */
ssize_t writen(int fd, const void* buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten; while (nleft > )
{
if ((nwritten = write(fd, buf, nleft)) == -)
{
if (nleft == count)
return (-);
else
break;
}
else if (nwritten == )
break;
else
{
buf += nwritten;
nleft -= nwritten;
}
} return (count - nleft); /* 返回成功发送的数据量 */
} void echoserv(int conn)
{
struct packet recvbuf;
int n;
while ()
{
memset(&recvbuf, , sizeof(recvbuf)); /* 先接收包头中的数据长度字段 */
int ret;
if ((ret = readn(conn, &recvbuf.len, )) == -)
ERR_EXIT("readn");
else if (ret < )
{
printf("client closed.\n");
break;
}
else
{
n = ntohl(recvbuf.len); /* 取出数据长度 */
if ((ret = readn(conn, recvbuf.buf, n)) == -)
ERR_EXIT("readn");
else if (ret < n)
{
printf("client closed.\n");
break;
} fputs(recvbuf.buf, stdout); /* 服务器端输出 */
if ((ret = writen(conn, &recvbuf, + n)) == -) /* 回射到客户端 */
ERR_EXIT("writen");
}
} exit(EXIT_SUCCESS);
} int main(void)
{
/* 创建一个监听套接字 */
int listen_fd;
if ((listen_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -)
ERR_EXIT("socket"); /* 初始化服务器地址 */
struct sockaddr_in serv_addr;
memset(&serv_addr, , sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons();
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
/**
* * serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
* * inet_aton("127.0.0.0", &serv_addr.sin_addr); */ /* 设置地址重复利用,使得服务器不必等待TIME_WAIT状态消失就可以重启 */
int on = ;
if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -)
ERR_EXIT("setsockopt"); /* 将服务器地址绑定到监听套接字上 */
if (bind(listen_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -)
ERR_EXIT("bind"); /* 监听进入的连接 */
if (listen(listen_fd, SOMAXCONN) == -)
ERR_EXIT("listen"); /* 初始化一个客户端地址用于保存接入的客户端 */
struct sockaddr_in clit_addr;
memset(&clit_addr, , sizeof(clit_addr));
socklen_t clit_len = sizeof(clit_addr); pid_t pid;
int conn;
while ()
{
/* 从已连接队列(保存已完成三次握手的连接)中返回第一个连接 */
/* 将返回的客户端连接保存在clit_addr中 */
if ((conn = accept(listen_fd, (struct sockaddr*)&clit_addr, &clit_len)) == -)
ERR_EXIT("accept");
printf("client(ip = %s, port = %d) connected.\n", inet_ntoa(clit_addr.sin_addr), ntohs(clit_addr.sin_port)); /* 创建子进程用于回射服务 */
if (( pid = fork()) == -)
ERR_EXIT("fork");
if (pid == ) /* 子进程,每接入一个客户端,就创建一个子进程进行回射服务,这样就可以实现并发处理了 */
{
/* 子进程只负责回射服务,不负责连接客户端,因此需要关闭监听套接字 */
close(listen_fd); /* 进行回射服务 */
echoserv(conn);
}
else /* 父进程 */
close(conn);
} return ;
}

并发客户端(3)[利用自定义包头解决粘包问题]

 [root@benxintuzi tcp]# cat echoclit_packet.c
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <stdio.h> #define ERR_EXIT(message) \
do \
{ \
perror(message); \
exit(EXIT_FAILURE); \
} while () /* 自定义包头 */
struct packet
{
int len; /* 数据长度 */
char buf[]; /* 数据缓存区 */
}; /* 接收定长数据包 */
size_t readn(int fd, void* buf, size_t count)
{
/* nleft:剩下未接收的数据量
* * nread:每次接收的数据量 */
size_t nleft = count;
ssize_t nread; while (nleft > ) /* 如果还有未接收的数据 */
{
if ((nread = read(fd, buf, nleft)) == -)
{
if (nleft == count) /* 剩下的数据量与初始数据量相等,说明未成功接收任何数据,失败返回 */
return (-);
else /* 本次read操作失败,返回到目前为止成功接收的数据量 */
break;
}
else if (nread == ) /* EOF,说明没有数据可供接收了 */
break;
else /* 接收数据后更新变量 */
{
buf += nread;
nleft -= nread;
}
} return (count - nleft); /* 返回成功接收的数据量 */
} /* 发送定长数据包 */
ssize_t writen(int fd, const void* buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten; while (nleft > )
{
if ((nwritten = write(fd, buf, nleft)) == -)
{
if (nleft == count)
return (-);
else
break;
}
else if (nwritten == )
break;
else
{
buf += nwritten;
nleft -= nwritten;
}
} return (count - nleft); /* 返回成功发送的数据量 */
} void echoclit(int sock_fd)
{
struct packet sendbuf;
struct packet recvbuf;
memset(&sendbuf, , sizeof(sendbuf));
memset(&recvbuf, , sizeof(recvbuf)); int n;
while (fgets(sendbuf.buf, sizeof(sendbuf.buf), stdin) != NULL)
{
n = sizeof(sendbuf.buf);
sendbuf.len = htonl(n);
int ret;
if ((ret = writen(sock_fd, &sendbuf, + n)) == -)
ERR_EXIT("writen"); if ((ret = readn(sock_fd, &recvbuf.len, )) == -)
ERR_EXIT("readn");
else if (ret < )
break;
else
{
n = ntohl(recvbuf.len);
if ((ret = readn(sock_fd, &recvbuf.buf, n)) == -)
ERR_EXIT("readn");
else if (ret < n)
break; fputs(recvbuf.buf, stdout);
memset(&sendbuf, , sizeof(sendbuf));
memset(&recvbuf, , sizeof(recvbuf));
}
}
} int main(void)
{
/* 创建连接套接字 */
int sock_fd;
if ((sock_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -)
ERR_EXIT("socket"); /* 初始化要连接的服务器地址 */
struct sockaddr_in serv_addr;
memset(&serv_addr, , sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons();
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); /* 将套接字连接至指定服务器 */
if (connect(sock_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -)
ERR_EXIT("connect"); echoclit(sock_fd);
close(sock_fd); return ;
}

用recv代替read时,如果指定MSG_PEEK标志,那么recv函数在返回缓冲区数据的同时仍然保留该部分数据,并不从缓冲区队列中删除,这样下一次读取时,将依然返回同样的数据。如下封装一个recv_peek函数:

 size_t recv_peek(int listen_fd, void* buf, size_t len)
{
while ()
{
int ret;
ret = recv(listen_fd, buf, len, MSG_PEEK);
if (ret == - && errno == EINTR)
continue;
return (ret);
}
}

用recv_peek来实现readline函数的功能:

 ssize_t readline(int listen_fd, void* buf, size_t maxline)
{
int ret;
int nread;
char* pbuf = buf;
int nleft = maxline; while ()
{
ret = recv_peek(listen_fd, pbuf, nleft);
if (ret < )
return (ret);
else if (ret == )
return (ret); nread = ret; int i;
for (i = ; i < nread; i++)
{
if (pbuf[i] == '\n')
{
ret = readn(listen_fd, pbuf, i + );
if (ret != i + )
exit(EXIT_FAILURE);
return (ret);
}
} if (nread > nleft)
exit(EXIT_FAILURE);
nleft -= nread; ret = readn(listen_fd, pbuf, nread);
if (ret != nread)
exit(EXIT_FAILURE);
pbuf += nread;
} return (-);
}

并发服务器(4)[子进程方式][利用readline函数实现]

 [root@benxintuzi tcp]# cat echoserv_childprocess_readline.c
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <stdio.h> #define ERR_EXIT(message) \
do \
{ \
perror(message); \
exit(EXIT_FAILURE); \
} while () /* 接收定长数据包 */
size_t readn(int fd, void* buf, size_t count)
{
/* nleft:剩下未接收的数据量
* * * nread:每次接收的数据量 */
size_t nleft = count;
ssize_t nread; while (nleft > ) /* 如果还有未接收的数据 */
{
if ((nread = read(fd, buf, nleft)) == -)
{
if (nleft == count) /* 剩下的数据量与初始数据量相等,说明未成功接收任何数据,失败返回 */
return (-);
else /* 本次read操作失败,返回到目前为止成功接收的数据量 */
break;
}
else if (nread == ) /* EOF,说明没有数据可供接收了 */
break;
else /* 接收数据后更新变量 */
{
buf += nread;
nleft -= nread;
}
} return (count - nleft); /* 返回成功接收的数据量 */
} /* 发送定长数据包 */
ssize_t writen(int fd, const void* buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten; while (nleft > )
{
if ((nwritten = write(fd, buf, nleft)) == -)
{
if (nleft == count)
return (-);
else
break;
}
else if (nwritten == )
break;
else
{
buf += nwritten;
nleft -= nwritten;
}
} return (count - nleft); /* 返回成功发送的数据量 */
} size_t recv_peek(int listen_fd, void* buf, size_t len)
{
while ()
{
int ret;
ret = recv(listen_fd, buf, len, MSG_PEEK);
if (ret == - && errno == EINTR)
continue;
return (ret);
}
} ssize_t readline(int listen_fd, void* buf, size_t maxline)
{
int ret;
int nread;
char* pbuf = buf;
int nleft = maxline; while ()
{
ret = recv_peek(listen_fd, pbuf, nleft);
if (ret < )
return (ret);
else if (ret == )
return (ret); nread = ret; int i;
for (i = ; i < nread; i++)
{
if (pbuf[i] == '\n')
{
ret = readn(listen_fd, pbuf, i + );
if (ret != i + )
exit(EXIT_FAILURE);
return (ret);
}
} if (nread > nleft)
exit(EXIT_FAILURE);
nleft -= nread; ret = readn(listen_fd, pbuf, nread);
if (ret != nread)
exit(EXIT_FAILURE);
pbuf += nread;
}
return (-);
} void echoserv(int conn)
{
char recvbuf[];
while ()
{
memset(&recvbuf, , sizeof(recvbuf)); int ret;
if ((ret = readline(conn, recvbuf, )) == -)
ERR_EXIT("readline");
else if (ret == )
{
printf("client closed.\n");
break;
}
else
{
fputs(recvbuf, stdout); /* 服务器端输出 */
if ((ret = writen(conn, recvbuf, strlen(recvbuf))) == -) /* 回射到客户端 */
ERR_EXIT("writen");
}
} exit(EXIT_SUCCESS);
} int main(void)
{
/* 创建一个监听套接字 */
int listen_fd;
if ((listen_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -)
ERR_EXIT("socket"); /* 初始化服务器地址 */
struct sockaddr_in serv_addr;
memset(&serv_addr, , sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons();
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
/**
* * * serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
* * * inet_aton("127.0.0.0", &serv_addr.sin_addr); */ /* 设置地址重复利用,使得服务器不必等待TIME_WAIT状态消失就可以重启 */
int on = ;
if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -)
ERR_EXIT("setsockopt"); /* 将服务器地址绑定到监听套接字上 */
if (bind(listen_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -)
ERR_EXIT("bind"); /* 监听进入的连接 */
if (listen(listen_fd, SOMAXCONN) == -)
ERR_EXIT("listen"); /* 初始化一个客户端地址用于保存接入的客户端 */
struct sockaddr_in clit_addr;
memset(&clit_addr, , sizeof(clit_addr));
socklen_t clit_len = sizeof(clit_addr); pid_t pid;
int conn;
while ()
{
/* 从已连接队列(保存已完成三次握手的连接)中返回第一个连接 */
/* 将返回的客户端连接保存在clit_addr中 */
if ((conn = accept(listen_fd, (struct sockaddr*)&clit_addr, &clit_len)) == -)
ERR_EXIT("accept");
printf("client(ip = %s, port = %d) connected.\n", inet_ntoa(clit_addr.sin_addr), ntohs(clit_addr.sin_port)); /* 创建子进程用于回射服务 */
if (( pid = fork()) == -)
ERR_EXIT("fork");
if (pid == ) /* 子进程,每接入一个客户端,就创建一个子进程进行回射服务,这样就可以实现并发处理了 */
{
/* 子进程只负责回射服务,不负责连接客户端,因此需要关闭监听套接字 */
close(listen_fd); /* 进行回射服务 */
echoserv(conn);
}
else /* 父进程 */
close(conn);
} return ();
}

并发客户端(4)[利用readline函数实现]

 [root@benxintuzi tcp]# cat echoclit_readline.c
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <stdio.h> #define ERR_EXIT(message) \
do \
{ \
perror(message); \
exit(EXIT_FAILURE); \
} while () /* 接收定长数据包 */
size_t readn(int fd, void* buf, size_t count)
{
/* nleft:剩下未接收的数据量
* * * * nread:每次接收的数据量 */
size_t nleft = count;
ssize_t nread; while (nleft > ) /* 如果还有未接收的数据 */
{
if ((nread = read(fd, buf, nleft)) == -)
{
if (nleft == count) /* 剩下的数据量与初始数据量相等,说明未成功接收任何数据,失败返回 */
return (-);
else /* 本次read操作失败,返回到目前为止成功接收的数据量 */
break;
}
else if (nread == ) /* EOF,说明没有数据可供接收了 */
break;
else /* 接收数据后更新变量 */
{
buf += nread;
nleft -= nread;
}
} return (count - nleft); /* 返回成功接收的数据量 */
} /* 发送定长数据包 */
ssize_t writen(int fd, const void* buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten; while (nleft > )
{
if ((nwritten = write(fd, buf, nleft)) == -)
{
if (nleft == count)
return (-);
else
break;
}
else if (nwritten == )
break;
else
{
buf += nwritten;
nleft -= nwritten;
}
} return (count - nleft); /* 返回成功发送的数据量 */
} size_t recv_peek(int listen_fd, void* buf, size_t len)
{
while ()
{
int ret;
ret = recv(listen_fd, buf, len, MSG_PEEK);
if (ret == - && errno == EINTR)
continue;
return (ret);
}
} ssize_t readline(int sock_fd, void* buf, size_t maxline)
{
int ret;
int nread;
char* pbuf = buf;
int nleft = maxline; while ()
{
ret = recv_peek(sock_fd, pbuf, nleft); if (ret < )
return (ret);
else if (ret == )
return (ret); nread = ret; int i;
for (i = ; i < nread; i++)
{
if (pbuf[i] == '\n')
{
ret = readn(sock_fd, pbuf, i + );
if (ret != i + )
exit(EXIT_FAILURE);
return (ret);
}
} if (nread > nleft)
exit(EXIT_FAILURE);
nleft -= nread; ret = readn(sock_fd, pbuf, nread);
if (ret != nread) exit(EXIT_FAILURE); pbuf += nread;
}
return (-);
} void echoclit(int sock_fd)
{
char sendbuf[];
char recvbuf[];
memset(&sendbuf, , sizeof(sendbuf));
memset(&recvbuf, , sizeof(recvbuf)); while (fgets(sendbuf, sizeof(sendbuf), stdin) != NULL)
{
writen(sock_fd, sendbuf, strlen(sendbuf)); int ret;
ret = readline(sock_fd, recvbuf, );
if (ret == -)
ERR_EXIT("readline");
else if (ret == )
break; fputs(recvbuf, stdout);
memset(sendbuf, , sizeof(sendbuf));
memset(recvbuf, , sizeof(recvbuf));
}
exit(EXIT_SUCCESS);
} int main(void)
{
/* 创建连接套接字 */
int sock_fd;
if ((sock_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -)
ERR_EXIT("socket"); /* 初始化要连接的服务器地址 */
struct sockaddr_in serv_addr;
memset(&serv_addr, , sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons();
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); /* 将套接字连接至指定服务器 */
if (connect(sock_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -)
ERR_EXIT("connect"); echoclit(sock_fd);
close(sock_fd); return ();
}

当从一个文件描述符读,然后又写到另一个文件描述符时,一般在下列形式的循环中使用阻塞I/O:

while ((n = read(STDIN_FILENO, buf, BUFSIZ)) > 0)

  if (write(STDOUT_FILENO, buf, n) != n)

    printf(“write error.\n”);

这种形式的阻塞I/O随处可见,但是如果必须从两个文件描述符读,该如何处理呢?在这种情况下,我们不能在任一个描述符上进行阻塞read,因为可能会因为被阻塞在一个描述符的read操作上而导致另一个描述符即使有数据也无法处理。

一种比较好的技术是使用I/O多路转接(I/O multiplexing)。在这种技术中,先构造一张我们感兴趣的描述符列表,然后调用一个函数,直到这些描述符中的一个已准备好进行I/O时,该函数才返回。如下介绍多路转接函数select、pselect、poll。

传给select的参数告诉内核:

我们所关心的描述符、关于每个描述符我们所关心的条件、愿意等待多长时间;

内核返回给我们如下信息:

已准备好的描述符的总数量、对于读、写或异常这3个条件中的每一个,哪些描述符已做好准备。

使用这种返回信息,就可调用相应的I/O函数(一般是read或write),并且确知不会发生阻塞情况。

一般使得这3种事件发生的条件如下:

(1)可读

套接字接收缓冲区有数据可读;

连接的读端关闭,即接收到了FIN段,读操作将返回0;

如果是监听套接字,则已完成三次握手的连接队列不为空;

套接字上发生了一个错误待处理,错误可以通过setsockopt指定的SO_ERROR选项来获取;

(2)可写

套接字发送缓冲区中有空间容纳数据;

连接的写端关闭,即收到RST段,再次调用write操作;

套接字上发生了一个错误待处理,错误可以通过setsockopt指定的SO_ERROR选项来获取;

(3)异常

套接字存在带外数据;

与select不同,poll并非为每个条件都构造一个描述符集,而是构造一个pollfd结构的数组,每个数组元素指定一个描述符编号以及我们对该描述符感兴趣的条件。

比较:

用select实现的并发服务器,存在两个方面的限制:

(1)   一个进程可以打开的最大文件描述符限制,当然可以通过调整内核参数参数解决(通过ulimit –n命令或者通过getrlimit/setrlimit函数实现);

(2)   select中的fd_set集合容量存在限制(FD_SETSIZE),可以修改内核,但是需要重新编译内核才能生效。

如果用poll实现并发服务器,则不存在select的第二个限制。

与select和poll相比,epoll的最大优势在于其不会随着监听fd数目的增多而降低效率。原因如下:select与poll中,内核采用轮询来处理,轮询的fd数目越多越耗时。而epoll是基于回调实现的,如果某个fd有预期事件发生,立即通过回调函数将其加入epoll就绪队列中,因此其仅关心“活跃”的fd,与fd的总数关系不大。再者,在内核空间与用户空间通信方面,select与poll采用内存拷贝方式,而epoll采用共享内存方式,效率优于前者。最后,epoll不仅会告诉应用程序有I/O事件到来,而且还会告诉应用程序关于事件相关的信息。根据这些信息,应用程序就可以直接定位事件而不必遍历整个fd集合。

epoll执行一个与poll相似的任务:监控多个文件描述符,从而判断它们中的一个或多个是否可以进行I/O操作。

如下系统调用用于创建和管理epoll实例:

(1)   epoll_create: 创建一个epoll实例,返回一个该实例的文件描述符;

(2)   epoll_ctl: 注册感兴趣事件对于某个文件描述符。注册感兴趣事件后的文件描述符集合有时也被称为epoll集合;

(3)   epoll_wait: 等待I/O事件。如果没有事件发生,则阻塞调用线程。

使用epoll的两种模式:

Level-triggered(LT) and edge-triggered(ET)

应用程序使用EPOLLET标志时,应该同时使用非阻塞的文件描述符来避免阻塞读或者阻塞写。

建议如下情况使用epoll时指定EPOLLET标志:

1.  使用非阻塞的文件描述符;

2.  read或者write操作返回EAGIN后等待事件发生;

此模式下,系统仅仅通知应用程序哪些fds变成了就绪状态,一旦fd变成就绪状态,epoll将不再关注这个fd的任何状态信息了(将该fd从epoll队列中移除),直到应用程序通过读写操作触发EAGAIN状态,此时epoll认为这个fd又变为了空闲状态,那么epoll将重新关注该fd的状态变化(将其重新加入epoll队列中)。随着epoll_wait的返回,epoll队列中的fds在逐渐减少,因此在大并发处理中,ET模式更有优势。

相反地,当时用LT模式时,epoll就是一个比较快的poll。此模式下,应用程序只需处理从epoll_wait返回的fds,这些fds我们认为其处于就绪状态。

如下分别采用select、poll、epoll实现并发服务器:

并发服务器(5)[select方式]

 [root@benxintuzi tcp]# cat echoserv_select.c
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <stdio.h> #define ERR_EXIT(message) \
do \
{ \
perror(message); \
exit(EXIT_FAILURE); \
} while () /* 接收定长数据包 */
size_t readn(int fd, void* buf, size_t count)
{
/* nleft:剩下未接收的数据量
* * * * nread:每次接收的数据量 */
size_t nleft = count;
ssize_t nread; while (nleft > ) /* 如果还有未接收的数据 */
{
if ((nread = read(fd, buf, nleft)) == -)
{
if (nleft == count) /* 剩下的数据量与初始数据量相等,说明未成功接收任何数据,失败返回 */
return (-);
else /* 本次read操作失败,返回到目前为止成功接收的数据量 */
break;
}
else if (nread == ) /* EOF,说明没有数据可供接收了 */
break;
else /* 接收数据后更新变量 */
{
buf += nread;
nleft -= nread;
}
} return (count - nleft); /* 返回成功接收的数据量 */
} /* 发送定长数据包 */
ssize_t writen(int fd, const void* buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten; while (nleft > )
{
if ((nwritten = write(fd, buf, nleft)) == -)
{
if (nleft == count)
return (-);
else
break;
}
else if (nwritten == )
break;
else
{
buf += nwritten;
nleft -= nwritten;
}
} return (count - nleft); /* 返回成功发送的数据量 */
} size_t recv_peek(int listen_fd, void* buf, size_t len)
{
while ()
{
int ret;
ret = recv(listen_fd, buf, len, MSG_PEEK);
if (ret == - && errno == EINTR)
continue;
return (ret);
}
} ssize_t readline(int listen_fd, void* buf, size_t maxline)
{
int ret;
int nread;
char* pbuf = buf;
int nleft = maxline; while ()
{
ret = recv_peek(listen_fd, pbuf, nleft);
if (ret < )
return (ret);
else if (ret == )
return (ret); nread = ret; int i;
for (i = ; i < nread; i++)
{
if (pbuf[i] == '\n')
{
ret = readn(listen_fd, pbuf, i + );
if (ret != i + )
exit(EXIT_FAILURE);
return (ret);
}
} if (nread > nleft)
exit(EXIT_FAILURE);
nleft -= nread; ret = readn(listen_fd, pbuf, nread);
if (ret != nread)
exit(EXIT_FAILURE);
pbuf += nread;
}
return (-);
} void echoserv(int listen_fd)
{
/** using select to realize a concurrent server */ struct sockaddr_in clit_addr;
memset(&clit_addr, , sizeof(clit_addr));
socklen_t clit_len = sizeof(clit_addr);
int conn;
int client[FD_SETSIZE];
int i;
for (i = ; i < FD_SETSIZE; i++)
client[i] = -; int maxi = ;
int nready;
int maxfd = listen_fd;
fd_set rset;
fd_set allset;
FD_ZERO(&rset);
FD_ZERO(&allset);
FD_SET(listen_fd, &allset); while ()
{
rset = allset;
nready = select(maxfd + , &rset, NULL, NULL, NULL);
if (nready == -)
{
if (errno == EINTR)
continue;
else
ERR_EXIT("select");
}
if (nready == )
continue; if (FD_ISSET(listen_fd, &rset))
{
conn = accept(listen_fd, (struct sockaddr*)&clit_addr, &clit_len);
if (conn == -)
ERR_EXIT("accept");
for (i = ; i < FD_SETSIZE; i++)
{
if (client[i] < )
{
client[i] = conn;
if (i > maxi)
maxi = i;
break;
}
}
if (i == FD_SETSIZE)
{
fprintf(stderr, "too many clients.\n");
exit(EXIT_FAILURE);
} printf("client(ip = %s, port = %d) connected.\n",inet_ntoa(clit_addr.sin_addr), ntohs(clit_addr.sin_port)); FD_SET(conn, &allset);
if (conn > maxfd)
maxfd = conn;
if (--nready <= )
continue;
} for (i = ; i <= maxi; i++)
{
conn = client[i];
if (conn == -)
continue;
if (FD_ISSET(conn, &rset))
{
char recvbuf[] = {};
int ret = readline(conn, recvbuf, );
if (ret == -)
ERR_EXIT("readline");
if (ret == )
{
printf("client close.\n");
FD_CLR(conn, &allset);
client[i] = -;
close(conn);
}
fputs(recvbuf, stdout);
writen(conn, recvbuf, strlen(recvbuf));
if (--nready <= )
break;
}
} }
exit(EXIT_SUCCESS);
} int main(void)
{
/* 创建一个监听套接字 */
int listen_fd;
if ((listen_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -)
ERR_EXIT("socket"); /* 初始化服务器地址 */
struct sockaddr_in serv_addr;
memset(&serv_addr, , sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons();
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
/**
* * * * serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
* * * * inet_aton("127.0.0.0", &serv_addr.sin_addr); */ /* 设置地址重复利用,使得服务器不必等待TIME_WAIT状态消失就可以重启 */
int on = ;
if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -)
ERR_EXIT("setsockopt"); /* 将服务器地址绑定到监听套接字上 */
if (bind(listen_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -)
ERR_EXIT("bind"); /* 监听进入的连接 */
if (listen(listen_fd, SOMAXCONN) == -)
ERR_EXIT("listen"); echoserv(listen_fd); close(listen_fd); return ();
}

并发客户端(5)

 [root@benxintuzi tcp]# cat echoclit_select.c
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <stdio.h> #define ERR_EXIT(message) \
do \
{ \
perror(message); \
exit(EXIT_FAILURE); \
} while () /* 接收定长数据包 */
size_t readn(int fd, void* buf, size_t count)
{
/* nleft:剩下未接收的数据量
* * * * * nread:每次接收的数据量 */
size_t nleft = count;
ssize_t nread; while (nleft > ) /* 如果还有未接收的数据 */
{
if ((nread = read(fd, buf, nleft)) == -)
{
if (nleft == count) /* 剩下的数据量与初始数据量相等,说明未成功接收任何数据,失败返回 */
return (-);
else /* 本次read操作失败,返回到目前为止成功接收的数据量 */
break;
}
else if (nread == ) /* EOF,说明没有数据可供接收了 */
break;
else /* 接收数据后更新变量 */
{
buf += nread;
nleft -= nread;
}
} return (count - nleft); /* 返回成功接收的数据量 */
} /* 发送定长数据包 */
ssize_t writen(int fd, const void* buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten; while (nleft > )
{
if ((nwritten = write(fd, buf, nleft)) == -)
{
if (nleft == count)
return (-);
else
break;
}
else if (nwritten == )
break;
else
{
buf += nwritten;
nleft -= nwritten;
}
} return (count - nleft); /* 返回成功发送的数据量 */
} size_t recv_peek(int listen_fd, void* buf, size_t len)
{
while ()
{
int ret;
ret = recv(listen_fd, buf, len, MSG_PEEK);
if (ret == - && errno == EINTR)
continue;
return (ret);
}
} ssize_t readline(int sock_fd, void* buf, size_t maxline)
{
int ret;
int nread;
char* pbuf = buf;
int nleft = maxline; while ()
{
ret = recv_peek(sock_fd, pbuf, nleft); if (ret < )
return (ret);
else if (ret == )
return (ret); nread = ret; int i;
for (i = ; i < nread; i++)
{
if (pbuf[i] == '\n')
{
ret = readn(sock_fd, pbuf, i + );
if (ret != i + )
exit(EXIT_FAILURE);
return (ret);
}
} if (nread > nleft)
exit(EXIT_FAILURE);
nleft -= nread; ret = readn(sock_fd, pbuf, nread);
if (ret != nread) exit(EXIT_FAILURE); pbuf += nread;
}
return (-);
} void echoclit(int sock_fd)
{
fd_set rset;
FD_ZERO(&rset); int nready;
int maxfd;
int fd_stdin = fileno(stdin);
if (fd_stdin > sock_fd)
maxfd = fd_stdin;
else
maxfd = sock_fd; char sendbuf[];
char recvbuf[]; while ()
{
FD_SET(fd_stdin, &rset);
FD_SET(sock_fd, &rset);
nready = select(maxfd + , &rset, NULL, NULL, NULL);
if (nready == -)
ERR_EXIT("select");
if (nready == )
continue; if (FD_ISSET(sock_fd, &rset))
{
int ret = readline(sock_fd, recvbuf, );
if (ret == -)
ERR_EXIT("readline");
else if (ret == )
break; fputs(recvbuf, stdout);
memset(recvbuf, , sizeof(recvbuf));
}
if (FD_ISSET(fd_stdin, &rset))
{
if (fgets(sendbuf, sizeof(sendbuf), stdin) == NULL)
break;
writen(sock_fd, sendbuf, strlen(sendbuf));
memset(sendbuf, , sizeof(sendbuf));
}
}
exit(EXIT_SUCCESS);
} int main(void)
{
/* 创建连接套接字 */
int sock_fd;
if ((sock_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -)
ERR_EXIT("socket"); /* 初始化要连接的服务器地址 */
struct sockaddr_in serv_addr;
memset(&serv_addr, , sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons();
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); /* 将套接字连接至指定服务器 */
if (connect(sock_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -)
ERR_EXIT("connect"); echoclit(sock_fd);
close(sock_fd); return ();
}

并发服务器(6)[poll方式]

 [root@benxintuzi tcp]# cat echoserv_poll.c
#include <poll.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <stdio.h> #define ERR_EXIT(message) \
do \
{ \
perror(message); \
exit(EXIT_FAILURE); \
} while () /* 接收定长数据包 */
size_t readn(int fd, void* buf, size_t count)
{
/* nleft:剩下未接收的数据量
* * * * * nread:每次接收的数据量 */
size_t nleft = count;
ssize_t nread; while (nleft > ) /* 如果还有未接收的数据 */
{
if ((nread = read(fd, buf, nleft)) == -)
{
if (nleft == count) /* 剩下的数据量与初始数据量相等,说明未成功接收任何数据,失败返回 */
return (-);
else /* 本次read操作失败,返回到目前为止成功接收的数据量 */
break;
}
else if (nread == ) /* EOF,说明没有数据可供接收了 */
break;
else /* 接收数据后更新变量 */
{
buf += nread;
nleft -= nread;
}
} return (count - nleft); /* 返回成功接收的数据量 */
} /* 发送定长数据包 */
ssize_t writen(int fd, const void* buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten; while (nleft > )
{
if ((nwritten = write(fd, buf, nleft)) == -)
{
if (nleft == count)
return (-);
else
break;
}
else if (nwritten == )
break;
else
{
buf += nwritten;
nleft -= nwritten;
}
} return (count - nleft); /* 返回成功发送的数据量 */
} size_t recv_peek(int listen_fd, void* buf, size_t len)
{
while ()
{
int ret;
ret = recv(listen_fd, buf, len, MSG_PEEK);
if (ret == - && errno == EINTR)
continue;
return (ret);
}
} ssize_t readline(int listen_fd, void* buf, size_t maxline)
{
int ret;
int nread;
char* pbuf = buf;
int nleft = maxline; while ()
{
ret = recv_peek(listen_fd, pbuf, nleft);
if (ret < )
return (ret);
else if (ret == )
return (ret); nread = ret; int i;
for (i = ; i < nread; i++)
{
if (pbuf[i] == '\n')
{
ret = readn(listen_fd, pbuf, i + );
if (ret != i + )
exit(EXIT_FAILURE);
return (ret);
}
} if (nread > nleft)
exit(EXIT_FAILURE);
nleft -= nread; ret = readn(listen_fd, pbuf, nread);
if (ret != nread)
exit(EXIT_FAILURE);
pbuf += nread;
} return (-);
} void echoserv(int listen_fd)
{
/** using poll to realize a concurrent server */
struct sockaddr_in clit_addr;
memset(&clit_addr, , sizeof(clit_addr));
socklen_t clit_len = sizeof(clit_addr); struct pollfd client[];
int maxi = ; int i;
for (i = ; i < ; i++)
client[i].fd = -; int nready;
client[].fd = listen_fd;
client[].events = POLLIN; /* 对监听套接字的可读事件感兴趣 */ int conn;
while ()
{
nready = poll(client, maxi + , -);
if (nready == -)
{
if (errno == EINTR)
continue;
else
ERR_EXIT("poll");
}
if (nready == )
continue; if (client[].revents & POLLIN)
{
conn = accept(listen_fd, (struct sockaddr*)&clit_addr, &clit_len);
if (conn == -)
ERR_EXIT("accept");
for (i = ; i < ; i++)
{
if (client[i].fd < ) /* 寻找空闲位置保存连接 */
{
client[i].fd = conn;
if (i > maxi)
maxi = i;
break;
}
}
if (i == )
{
fprintf(stderr, "too many clients.\n");
exit(EXIT_FAILURE);
} printf("client(ip = %s, port = %d) connected.\n",inet_ntoa(clit_addr.sin_addr), ntohs(clit_addr.sin_port)); client[i].events = POLLIN; if (--nready <= )
continue; } for (i = ; i <= maxi; i++)
{
conn = client[i].fd;
if (conn == -)
continue;
if (client[i].events & POLLIN)
{
char recvbuf[] = {};
int ret = readline(conn, recvbuf, );
if (ret == -)
ERR_EXIT("readline");
if (ret == )
{
printf("client close.\n");
client[i].fd = -;
close(conn);
} fputs(recvbuf, stdout);
writen(conn, recvbuf, strlen(recvbuf));
if (--nready <= )
break;
}
} } exit(EXIT_SUCCESS);
} int main(void)
{
/* 创建一个监听套接字 */
int listen_fd;
if ((listen_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -)
ERR_EXIT("socket"); /* 初始化服务器地址 */
struct sockaddr_in serv_addr;
memset(&serv_addr, , sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons();
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
/**
* * * * * serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
* * * * * inet_aton("127.0.0.0", &serv_addr.sin_addr); */ /* 设置地址重复利用,使得服务器不必等待TIME_WAIT状态消失就可以重启 */
int on = ;
if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -)
ERR_EXIT("setsockopt"); /* 将服务器地址绑定到监听套接字上 */
if (bind(listen_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -)
ERR_EXIT("bind"); /* 监听进入的连接 */
if (listen(listen_fd, SOMAXCONN) == -)
ERR_EXIT("listen"); echoserv(listen_fd);
close(listen_fd); return ();
}

并发客户端(6)---同(5)

并发服务器(7)[epoll方式]

 [root@benxintuzi tcp]# cat echoserv_epoll.cpp
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <stdio.h> #include <vector>
#include <algorithm> typedef std::vector<struct epoll_event> EventList; #define ERR_EXIT(message) \
do \
{ \
perror(message); \
exit(EXIT_FAILURE); \
} while () /* 接收定长数据包 */
size_t readn(int fd, void* buf, size_t count)
{
/* nleft:剩下未接收的数据量
* * * * nread:每次接收的数据量 */
size_t nleft = count;
ssize_t nread; while (nleft > ) /* 如果还有未接收的数据 */
{
if ((nread = read(fd, buf, nleft)) == -)
{
if (nleft == count) /* 剩下的数据量与初始数据量相等,说明未成功接收任何数据,失败返回 */
return (-);
else /* 本次read操作失败,返回到目前为止成功接收的数据量 */
break;
}
else if (nread == ) /* EOF,说明没有数据可供接收了 */
break;
else /* 接收数据后更新变量 */
{
buf += nread;
nleft -= nread;
}
} return (count - nleft); /* 返回成功接收的数据量 */
} /* 发送定长数据包 */
ssize_t writen(int fd, const void* buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten; while (nleft > )
{
if ((nwritten = write(fd, buf, nleft)) == -)
{
if (nleft == count)
return (-);
else
break;
}
else if (nwritten == )
break;
else
{
buf += nwritten;
nleft -= nwritten;
}
} return (count - nleft); /* 返回成功发送的数据量 */
} size_t recv_peek(int listen_fd, void* buf, size_t len)
{
while ()
{
int ret;
ret = recv(listen_fd, buf, len, MSG_PEEK);
if (ret == - && errno == EINTR)
continue;
return (ret);
}
} ssize_t readline(int listen_fd, void* buf, size_t maxline)
{
int ret;
int nread;
char* pbuf = (char*)buf;
int nleft = maxline; while ()
{
ret = recv_peek(listen_fd, pbuf, nleft);
if (ret < )
return (ret);
else if (ret == )
return (ret); nread = ret; int i;
for (i = ; i < nread; i++)
{
if (pbuf[i] == '\n')
{
ret = readn(listen_fd, pbuf, i + );
if (ret != i + )
exit(EXIT_FAILURE);
return (ret);
}
} if (nread > nleft)
exit(EXIT_FAILURE);
nleft -= nread; ret = readn(listen_fd, pbuf, nread);
if (ret != nread)
exit(EXIT_FAILURE);
pbuf += nread;
}
return (-);
} void activate_nonblock(int fd)
{
int flags;
if ((flags = fcntl(fd, F_GETFL)) == -)
ERR_EXIT("fcntl");
flags |= O_NONBLOCK; int ret;
if ((ret = fcntl(fd, F_SETFL, flags)) == -)
ERR_EXIT("fcntl");
} void handle_sigchld(int sig)
{
/* wait(NULL); */
while (waitpid(-, NULL, WNOHANG) > )
;
} void handle_sigpipe(int sig)
{
printf("recv a sig = %d.\n", sig);
} void echoserv(int listen_fd, int conn)
{
std::vector<int> clients;
int epoll_fd;
epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (epoll_fd == -)
ERR_EXIT("epoll_create1"); struct epoll_event event;
event.data.fd = listen_fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &event); EventList events();
struct sockaddr_in clit_addr;
socklen_t clit_len = sizeof(clit_addr);
int nready;
while ()
{
nready = epoll_wait(epoll_fd, &*events.begin(), static_cast<int>(events.size()), -);
if (nready == -)
{
if (errno == EINTR)
continue;
ERR_EXIT("epoll_wait");
}
if (nready == )
continue;
if ((size_t)nready == events.size()) /* 如果存储空间已满,则扩充容量 */
events.resize(events.size() * ); int i;
for (i = ; i < nready; i++)
{
if (events[i].data.fd == listen_fd)
{
conn = accept(listen_fd, (struct sockaddr*)&clit_addr, &clit_len);
if (conn == -)
ERR_EXIT("accept");
printf("client(ip = %s, port = %d) connected.\n", inet_ntoa(clit_addr.sin_addr), ntohs(clit_addr.sin_port));
clients.push_back(conn);
activate_nonblock(conn); /* 设置当前连接为非阻塞模式 */
event.data.fd = conn;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn, &event);
}
else if (events[i].events & EPOLLIN)
{
conn = events[i].data.fd;
if (conn < )
continue; char recvbuf[] = {};
int ret = readline(conn, recvbuf, );
if (ret == -)
ERR_EXIT("readline");
if (ret == )
{
printf("client closed.\n");
close(conn); event = events[i];
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, conn, &event);
clients.erase(std::remove(clients.begin(), clients.end(), conn), clients.end());
} fputs(recvbuf, stdout);
writen(conn, recvbuf, strlen(recvbuf));
}
}
}
} int main(void)
{
/* 产生如下信号时,我们可以捕捉并处理,但是通常忽略即可 */
// signal(SIGPIPE, handle_sigpipe); /* 在收到RST段后,再次调用write操作就会产生SIGPIPE信号 */
// signal(SIGCHLD, handle_sigchld); /* 避免僵尸进程 */
signal(SIGPIPE, SIG_IGN);
signal(SIGCHLD, SIG_IGN); /* 创建一个监听套接字 */
int listen_fd;
if ((listen_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -)
ERR_EXIT("socket"); /* 初始化服务器地址 */
struct sockaddr_in serv_addr;
memset(&serv_addr, , sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons();
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
/**
* * * * serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
* * * * inet_aton("127.0.0.0", &serv_addr.sin_addr); */ /* 设置地址重复利用,使得服务器不必等待TIME_WAIT状态消失就可以重启 */
int on = ;
if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -)
ERR_EXIT("setsockopt"); /* 将服务器地址绑定到监听套接字上 */
if (bind(listen_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -)
ERR_EXIT("bind"); /* 监听进入的连接 */
if (listen(listen_fd, SOMAXCONN) == -)
ERR_EXIT("listen"); int conn;
echoserv(listen_fd, conn); close(listen_fd); return ();
}

并发客户端(7)---同(5)

利用线程也可以实现并发功能,如下使用POSIX线程库实现,编译时要加入-pthread选项。传统函数的返回值在失败时一般会返回-1,并设置errno变量。pthreads函数出错时不会设置全局变量errno,而是返回错误码,然后用strerror函数打印与该错误码相关的信息。

并发服务器(8)[线程方式]

 [root@benxintuzi thread]# cat echoserv_thread.c
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <errno.h>
#include <stdio.h> #define ERR_EXIT(message) \
do \
{ \
perror(message); \
exit(EXIT_FAILURE); \
} while() void echoserv(int conn)
{
char recvbuf[];
while ()
{
memset(recvbuf, , sizeof(recvbuf));
int ret;
if ((ret = read(conn, recvbuf, sizeof(recvbuf))) < )
ERR_EXIT("read");
if (ret == ) /* client closed */
{
printf("client closed.\n");
break;
}
fputs(recvbuf, stdout);
if (write(conn, recvbuf, ret) != ret)
ERR_EXIT("write");
} } void* thread_routine(void* arg)
{
int conn = (int)arg;
echoserv(conn);
printf("exit thread.\n"); return NULL;
} int main(void)
{
int sock_fd;
if ((sock_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < )
ERR_EXIT("socket"); struct sockaddr_in serv_addr;
memset(&serv_addr, , sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons();
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); int on = ;
if (setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < )
ERR_EXIT("setsockopt"); if (bind(sock_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) < )
ERR_EXIT("bind"); if (listen(sock_fd, SOMAXCONN) < )
ERR_EXIT("listen");
struct sockaddr_in peer_addr;
socklen_t peer_len = sizeof(peer_addr);
int conn;
while ()
{
if ((conn = accept(sock_fd, (struct sockaddr*)&peer_addr, &peer_len)) < )
ERR_EXIT("accept");
printf("client(ip = %s, port = %d) connected.\n", inet_ntoa(peer_addr.sin_addr), ntohs(peer_addr.sin_port)); pthread_t tid;
int ret;
ret = pthread_create(&tid, NULL, thread_routine, (void*)conn);
if (ret != )
{
fprintf(stderr, "pthread_create: %s.\n", strerror(ret));
exit(EXIT_FAILURE);
}
} return ;
}

并发客户端(8)---同(1)

与tcp相比,udp有时显得更为高效。但是udp报文可能会丢失、重复、乱序,其缺乏流量控制,upd编程中,recvfrom返回0并不代表连接关闭,因为udp是无连接的。

并发服务器(9)[udp]

 [root@benxintuzi udp]# cat echoserv.c
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <errno.h> #define ERR_EXIT(msg) \
do \
{ \
perror(msg); \
exit(EXIT_FAILURE); \
} while () void echoserv(int sock_fd)
{
char recv_buf[] = {};
struct sockaddr_in peer_addr;
socklen_t peer_len;
int n;
while ()
{
peer_len = sizeof(peer_addr);
memset(recv_buf, , sizeof(recv_buf)); if ((n = recvfrom(sock_fd, recv_buf, sizeof(recv_buf), , (struct sockaddr*)&peer_addr, &peer_len)) == -)
{
if (errno == EINTR)
continue;
else
ERR_EXIT("recvfrom");
}
else if (n == )
{
/* recvfrom返回0不代表连接关闭,因为udp是无连接的 */
}
else
{
fputs(recv_buf, stdout);
sendto(sock_fd, recv_buf, n, , (struct sockaddr*)&peer_addr, peer_len);
}
} close(sock_fd);
} int main(void)
{
int sock_fd;
if ((sock_fd = socket(AF_INET, SOCK_DGRAM, )) == -)
ERR_EXIT("socket"); struct sockaddr_in serv_addr;
memset(&serv_addr, , sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons();
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); if (bind(sock_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) < )
ERR_EXIT("bind"); echoserv(sock_fd); return ;
}

并发客户端(9)[udp]

 [root@benxintuzi udp]# cat echoclit.c
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <errno.h> #define ERR_EXIT(msg) \
do \
{ \
perror(msg); \
exit(EXIT_FAILURE); \
} while () void echoclit(int sock_fd)
{ struct sockaddr_in serv_addr;
memset(&serv_addr, , sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons();
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
if (connect(sock_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -)
ERR_EXIT("connect"); char send_buf[] = {};
char recv_buf[] = {};
while (fgets(send_buf, sizeof(send_buf), stdin) != NULL)
{
/** sendto(sock_fd, send_buf, sizeof(send_buf), 0, (struct sockaddr*)&serv_addr, sizeof(serv_addr)); */
/** 使用connect后,可以使用send代替sendto发送地址,因为connect已经指定了地址,就不再需要sendto中的地址了 */
send(sock_fd, send_buf, sizeof(send_buf), ); /** recvfrom(sock_fd, recv_buf, sizeof(recv_buf), 0, NULL, NULL); */
recv(sock_fd, recv_buf, sizeof(recv_buf), );
fputs(recv_buf, stdout);
memset(send_buf, , sizeof(send_buf));
memset(recv_buf, , sizeof(recv_buf));
} exit(EXIT_SUCCESS);
} int main(void)
{
int sock_fd;
if ((sock_fd = socket(AF_INET, SOCK_DGRAM, )) < )
ERR_EXIT("socket"); echoclit(sock_fd);
close(sock_fd); return ;
}