网络编程之Client/Server Design 惊群分析篇(五)

时间:2022-12-17 22:23:13

前言

  上一篇中提到了惊群现象,笔者本篇将非常严谨的记录关于惊群的一系列问题,同样根据UNP的编排进行。

1. thundering herd

  • When the program starts, N children are created, and all N call accept and all are put to sleep by the kernel (line 140, p. 458 of TCPv2). When the first client connectionarrives, all N children are awakened.
    • This is because all N have gone to sleep on the same ‘‘wait channel,’’ the so_timeo member of the socket structure, because all N share the same listening descriptor, which points to the same socket structure. Even though all N are awakened, the first of the N to run will obtain the connection and the remaining N − 1 will all go back to sleep, because when each of the remaining N − 1 execute the statement on line 135 of p. 458 of TCPv2, the queue length will be 0 since the first child to run already took the connection.
  • This is sometimes called the thundering herd problem because all N are awakened even though only one will obtain the connection. Nevertheless, the code works, with the performance side effect of waking up too many processes each time a connection is ready to be accepted.

  以上引用来自UNP,甚至给出了内核级别的解释,言简意赅的说,就是当多个进程阻塞在accept时,当有client请求连接时,唤醒全部阻塞在该函数的进程,但却只有一个进程可以得到连接。造成了不必要的进程调度的时间的浪费。

2. 现状

  从linux 2.6以后,由内核给出了解决方案,accept不再造成thundering herd现象。具体可以参考论文:accept() scal ability on linux。但是本文还是继续探讨这个问题,因为如果使用select或者pollepoll处理listen sockfd时,还是可能造成惊群现象的发生。
  特别的UNP中都是比较老的API接口,现在大部分的Server都使用epoll,本文也将根据UNP的方式但是选择使用epoll代替原文的select

3. epoll回顾 

  笔者在APUE就讨论过epoll的使用方法,但是并没有区别下ET和LT模式,在该篇进行一下更进一步的学习,以下引用来自《MAN》手册:

The epoll event distribution interface is able to behave both as edge-triggered (ET) and as level-triggered (LT). The dif‐ference between the two mechanisms can be described as follows. Suppose that this scenario happens:学过单片机的很容易理解边沿触发和水平触发的区别。

  • The file descriptor that represents the read side of a pipe (rfd) is registered on the epoll instance.
  • A pipe writer writes 2 kB of data on the write side of the pipe.
  • A call to epoll_wait(2) is done that will return rfd as a ready file descriptor.
  • The pipe reader reads 1 kB of data from rfd.
  • A call to epoll_wait(2) is done.

  MAN手册根据以上场景说明这个问题,非常清晰,容易理解:

边沿触下的情况:

  • If the rfd file descriptor has been added to the epoll interface using the EPOLLET (edge-triggered) flag, the call to epoll_wait(2) done in step 5 will probably hang despite the available data still present in the file input buffer; meanwhile the remote peer might be expecting a response based on the data it already sent. The reason for this is that edge-triggered mode delivers events only when changes occur on the monitored file descriptor. So, in step 5 the caller might end up waiting for some data that is already present inside the input buffer.
    • In the above example, an event on rfd will be generated because of the write done in 2 and the event is consumed in 3. Since the read operation done in 4 does not consume the whole buffer data, the call to epoll_wait(2) done in step 5 might block indefinitely.所以这里说明了边沿触发的特点,及引发问题的原因:在第一次epoll返回时,没有读取干净缓冲区。
  • An application that employs the EPOLLET flag should use nonblocking file descriptors to avoid having a blocking read or write starve a task that is handling multiple file descriptors.
    The suggested way to use epoll as an edge-triggered (EPOLLET) interface is as follows:
    • with nonblocking file descriptors; and 使用边沿触发,首先监控的fd设置为非阻塞
    • by waiting for an event only after read(2) or write(2) return EAGAIN.读或者写直到EAGAIN错误出现,才进行第二轮epoll调用

水平触发下的情况:
这里MAN手册一笔带过,和poll一样,也就是说epoll返回的条件:只要读缓冲区还有数据或者写缓冲区还有空间。

3. 惊群避免方式

3.1 综述

  UNP中给出了避免惊群的三种方式:文件锁,线程锁和文件描述符传递。虽然那个时候是针对accept函数来说的,现在笔者将方法运用到epoll模型上,为了不浪费额外的时间,这里只针对线程锁在进程中的应用,这种解决方法进行编程实现。最后再简单和Nginx中的实现方式进行对比,来结束本篇。

3.2 进程间mutex

  关于进程间的mutex,笔者早在多线程之Mutex(二)里面详细的记录过使用方法,现在可以快速编程了,以下程序修改自笔者以前的网络编程之Client/Server Design Prefork Server(三)

//30_lock_preser.c
#include <sys/socket.h>
#include <sys/un.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <errno.h>
#include <err.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/wait.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <pthread.h>
#include <sys/epoll.h>
#define KEY 0x1 /* key for first message queue */
#define LISTENQ 10
#define MAXLINE 20
#define MAXN 16384
#define handle_error(msg) \
          do { perror(msg); exit(EXIT_FAILURE); } while (0)
#define handle_error_en(en, msg) \
          do { errno = en; perror(msg); exit(EXIT_FAILURE); } while (0)

typedef struct {
    long mtype;
    char mtext[MAXLINE];
} Mymsg;

static int read_cnt;
static char *read_ptr;
static char read_buf[MAXLINE];
char flag;


ssize_t readline(int fd, void *vptr, size_t maxlen);
static ssize_t my_read(int fd, char *ptr) ;
ssize_t writen(int fd, const void *vptr, size_t n);
void pr_cpu_time(void);
pid_t child_make(int, long *, int listenfd);
void child_main(int,long *, int listenfd);



static pthread_mutex_t *mptr;

int main(int argc, char **argv){
    int listenfd, connfd,ident,flags;
    socklen_t clilen;
    int nchildren;
    in_port_t port;
    pid_t *pids;
    long * ptr;
    Mymsg msg;
    int i=0;
    int on=1;
    struct sockaddr_in cliaddr, servaddr;
    pthread_mutexattr_t mattr;
    int err;


    if (argc != 4)
        errx(1,"tcp_fork_server <addr> <port> <childnum>\n");

    nchildren = atoi(argv[3]);
    if((pids = calloc(nchildren, sizeof(pid_t))) == NULL)
        handle_error("calloc");

    port = atoi(argv[2]);
    if((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
        handle_error("socket");
    if(setsockopt(listenfd, SOL_SOCKET,SO_REUSEADDR, &on, sizeof(on)) == -1)
    handle_error("setsockopt");
    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(port);
    if(inet_pton(AF_INET, argv[1], &servaddr.sin_addr) == -1)
       handle_error("inet_pton");
    //这种匿名映射共享内存只能用于具有一定关系的进程中通信
    if((ptr = mmap(0, nchildren * sizeof(long), PROT_READ | PROT_WRITE,MAP_ANON | MAP_SHARED, -1, 0)) == MAP_FAILED)
       handle_error("mmap");
    if((mptr = mmap(0, nchildren * sizeof(pthread_mutex_t), PROT_READ | PROT_WRITE,MAP_ANON | MAP_SHARED, -1, 0)) == MAP_FAILED)
       handle_error("mmap");

    if((err = pthread_mutexattr_init(&mattr)) != 0)
        handle_error_en(err,"pthread_mutexattr_init");
    if((err = pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED)) != 0)
        handle_error_en(err,"pthread_mutexattr_setpshared");
    if((err = pthread_mutex_init(mptr, &mattr)) != 0)
        handle_error_en(err,"pthread_mutex_init");

    if((flags = fcntl(listenfd, F_GETFL, 0)) == -1)
         handle_error("fcntl");
    else
        if(fcntl(listenfd, F_SETFL, flags | O_NONBLOCK) == -1)
                handle_error("fcntl");

    if(bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) == -1)
        handle_error("bind");
    if(listen(listenfd, LISTENQ) == -1)
        handle_error("listen");

    for (i = 0; i < nchildren; i++)
        pids[i] = child_make(i,ptr,listenfd); /* parent returns */

    for(;;){
        //和上一篇的逻辑比起来,这里就需要把message IPC的判断放在这里
        if(!flag){
            if((ident=msgget(KEY,0660)) == -1 )
                continue;
            flag=1;
        }
        if(flag)
            //每次判断是否client发送消息给server
            if (msgrcv(ident,&msg,MAXLINE,1,IPC_NOWAIT) ==  -1){
                if(errno != ENOMSG)
                    handle_error("msgrcv");
            }
            else{
                for (i = 0; i < nchildren; i++)
                    kill(pids[i], SIGTERM);

                while (wait(NULL) > 0);
                    if (errno != ECHILD)
                        errx(1,"wait error");
                pr_cpu_time();
                for(i =0;i<nchildren;i++)
                    printf("child %d connected number:%d\n",i,ptr[i]);
                msg.mtype=2;
                memcpy(msg.mtext,"done",5);
                if (msgsnd(ident,&msg,MAXLINE,0) == -1 )
                    handle_error("msgrcv");
                return 0;
            }
    }
}


void pr_cpu_time(void){
    double user, sys;
    struct rusage myusage, childusage;
    if (getrusage(RUSAGE_SELF, &myusage) < 0)
        handle_error("getrusage error");
    if (getrusage(RUSAGE_CHILDREN, &childusage) < 0)
        handle_error("getrusage error");
    user = (double) myusage.ru_utime.tv_sec +myusage.ru_utime.tv_usec / 1000000.0;
    user += (double) childusage.ru_utime.tv_sec +childusage.ru_utime.tv_usec / 1000000.0;
    sys = (double) myusage.ru_stime.tv_sec + myusage.ru_stime.tv_usec / 1000000.0;
    sys += (double) childusage.ru_stime.tv_sec + childusage.ru_stime.tv_usec / 1000000.0;
    printf("\nuser time = %g, sys time = %g\n", user, sys);

}



pid_t child_make(int i, long * ptr, int listenfd){
    pid_t pid;
    if ((pid = fork()) <0)
        handle_error("fork");
    else if(pid > 0)
        return (pid); /* parent */
    else
        child_main(i, ptr,listenfd); /* never returns */
}


void child_main(int j, long * ptr, int listenfd){
    int connfd;
    socklen_t clilen;
    struct sockaddr *cliaddr;
    int err,efd,m;
    struct epoll_event even1,events[2];
    int ntowrite;
    ssize_t nread;
    char line[MAXLINE], result[MAXN];
    even1.events=EPOLLIN;
    even1.data.fd=listenfd;

    if((efd=epoll_create(2)) == -1)
            handle_error("epoll_create");
    if(epoll_ctl(efd,EPOLL_CTL_ADD,listenfd,&even1) == -1)
            handle_error("epoll_ctl");


    if((cliaddr = malloc(sizeof(struct sockaddr_in))) == NULL)
        handle_error("malloc");
    printf("child %ld starting\n", (long) getpid());
    for ( ; ; ) {

        if((err = pthread_mutex_lock(mptr)) != 0)
            handle_error_en(err,"pthread_mutex_lock");
        //debug
        //printf("hold the mutex : %d child\n",j);
        if((m=epoll_wait(efd,events,2,-1)) == -1)
            handle_error("epoll_wait");
        for(int i=0;i<m;i++)
            if(events[i].events & EPOLLIN == EPOLLIN){
                if(events[i].data.fd == listenfd){
                      clilen = sizeof(struct sockaddr_in);
                      if((connfd = accept(listenfd, (struct sockaddr *) &cliaddr, &clilen)) == -1 )
                            handle_error("accept");
                      even1.data.fd=connfd;

                  if(epoll_ctl(efd,EPOLL_CTL_DEL,listenfd,NULL) == -1)
                        handle_error("epoll_ctl");
                  if(epoll_ctl(efd,EPOLL_CTL_ADD,connfd,&even1) == -1)
                            handle_error("epoll_ctl");

                      break;
                 }
                 if(events[i].data.fd == connfd){
                      if((nread=readline(connfd, line, MAXLINE)) == -1)
                           handle_error("readline");
                      else if(nread == 0){
                           even1.data.fd=listenfd;
                   if(epoll_ctl(efd,EPOLL_CTL_DEL,connfd,NULL) == -1)
                           handle_error("epoll_ctl");
                   if(epoll_ctl(efd,EPOLL_CTL_ADD,listenfd,&even1) == -1)
                           handle_error("epoll_ctl");
                           //在共享存储区域,每个client对应一个long存储区域
                           ptr[j]++;
                           if(close(connfd) == -1)
                               handle_error("close"); /* parent closes connected socket */
                           break;
                      }
                      ntowrite = atol(line);
                      if ((ntowrite <= 0) || (ntowrite > MAXN))
                           errx(1,"client request for %d bytes,max size is %d\n", ntowrite,MAXN);
                      if(writen(connfd, result, ntowrite) == -1)
                           handle_error("writen");
                      break;
                 }
            }
        if((err = pthread_mutex_unlock(mptr)) != 0)
            handle_error_en(err,"pthread_mutex_unlock");
    }
}



ssize_t writen(int fd, const void *vptr, size_t n){
    size_t nleft;
    ssize_t nwritten;
    const char *ptr;
    ptr = vptr;
    nleft = n;
    while (nleft > 0){
        if ( (nwritten = write(fd, ptr, nleft)) <= 0){
            if (nwritten < 0 && errno == EINTR)
                nwritten = 0; /* and call write() again */
            else
                return (-1); /* error */
        }
        nleft -= nwritten;
        ptr += nwritten;
    }
    return (n);
}


static ssize_t my_read(int fd, char *ptr){
    if (read_cnt <= 0) {
        again:
            if ( (read_cnt = read(fd, read_buf, sizeof(read_buf))) < 0) {
                if (errno == EINTR)
                    goto again;
                return (-1);
            } else if (read_cnt == 0)
                return (0);
            read_ptr = read_buf;
    }
    read_cnt--;
    *ptr = *read_ptr++;
    return (1);
}

ssize_t readline(int fd, void *vptr, size_t maxlen){
    ssize_t n, rc;
    char c, *ptr;
    ptr = vptr;
    for (n = 1; n < maxlen; n++) {
        if ( (rc = my_read(fd, &c)) == 1) {
            *ptr++ = c;
            if (c == '\n')
                break; /* newline is stored, like fgets() */
        } else if (rc == 0) {
            *ptr = 0;
            return (n - 1); /* EOF, n - 1 bytes were read */
        } else
            return (-1); /* error, errno set by read() */
    }
    *ptr = 0; /* null terminate like fgets() */
    return (n);
}

  运行结果:

//testbench端
[root@localhost ~]# ./30_testbench 127.0.0.1 12345 5 100 2000
[root@localhost ~]# ./30_testbench 127.0.0.1 12345 5 100 2000

[root@localhost ~]# ./30_lock_preser 127.0.0.1 12345 5 
child 1710 starting
child 1709 starting
child 1708 starting
child 1707 starting
child 1706 starting

user time = 1.77946, sys time = 3.42755
child 0 connected number:0
child 1 connected number:0
child 2 connected number:0
child 3 connected number:0
child 4 connected number:500
[root@localhost ~]# ./30_lock_preser 127.0.0.1 12345 5
child 1722 starting
child 1721 starting
child 1720 starting
child 1719 starting
child 1718 starting

user time = 0.930036, sys time = 1.72412
child 0 connected number:0
child 1 connected number:107
child 2 connected number:0
child 3 connected number:393
child 4 connected number:0

  现在笔者敲的这个server,论代码量和复杂度都完虐Tinyhttp啦,不过可以看到,负载一点也不平衡,这意味着多进程的优势也不能得到最大程度的发挥。特别的如果全部的500个连接的请求平摊到5个进程,响应速度应该更快,所以笔者还要优化自己的server:

for ( ; ; ) {
        //笔者就是简单的平衡下负载,且该方式是hard code
        if( ptr[j] >100)
            continue;
        if((err = pthread_mutex_lock(mptr)) != 0)
            handle_error_en(err,"pthread_mutex_lock");
            ....

  结果:

[root@localhost ~]# ./30_lock_preser 127.0.0.1 12345 5 
child 2118 starting
child 2117 starting
child 2116 starting
child 2115 starting
child 2114 starting

user time = 1.50234, sys time = 2.46653
child 0 connected number:101
child 1 connected number:101
child 2 connected number:101
child 3 connected number:96
child 4 connected number:101

  虽然负载平衡了,但是运行时间仿佛并没有改善,不增加篇幅,第二篇将对比Nginx的处理方式。