linux tcp server

时间:2023-03-09 07:40:52
linux tcp server

这里分析两种模型

A: 来源于网络,http://bbs.chinaunix.net/thread-4067753-1-1.html,号称50万QPS

B: 本人自己写的,我觉得性能上比上述的模型要好

——————————————————————————————————————————

A:

#define _GNU_SOURCE

#include <stdlib.h>

#include <stdio.h>

#include <string.h>

#include <sched.h>

#include <pthread.h>

#include <sys/epoll.h>

#include <sys/socket.h>

#include <netinet/in.h>

#include <arpa/inet.h>

#include <signal.h>

#include <unistd.h>

#include <fcntl.h>

#include <errno.h>

#include <time.h>

typedef struct connection_st {

int sock;

int index; /* which epoll fd this conn belongs to*/

int using;

#define BUF_SIZE 4096

int roff;

char rbuf[BUF_SIZE];

int woff;

char wbuf[BUF_SIZE];

}*connection_t;

#define CONN_MAXFD 65536

struct connection_st g_conn_table[CONN_MAXFD] = {0};

static sig_atomic_t shut_server = 0;

void shut_server_handler(int signo) {

shut_server = 1;

}

#define EPOLL_NUM 8

int epfd[EPOLL_NUM];

int lisSock;

#define WORKER_PER_GROUP 1

#define NUM_WORKER (EPOLL_NUM * WORKER_PER_GROUP)

pthread_t worker[NUM_WORKER]; /* echo group has 6 worker threads */

int sendData(connection_t conn, char *data, int len) {

if (conn->woff){

if (conn->woff + len > BUF_SIZE) {

return -1;

}

memcpy(conn->wbuf + conn->woff, data, len);

conn->woff += len;

return 0;

} else {

int ret = write(conn->sock, data, len);

if (ret > 0){

if (ret == len) {

return 0;

}

int left = len - ret;

if (left > BUF_SIZE) return -1;

memcpy(conn->wbuf, data + ret, left);

conn->woff = left;

} else {

if (errno != EINTR && errno != EAGAIN) {

return -1;

}

if (len > BUF_SIZE) {

return -1;

}

memcpy(conn->wbuf, data, len);

conn->woff = len;

}

}

return 0;

}

int handleReadEvent(connection_t conn) {

if (conn->roff == BUF_SIZE) {

return -1;

}

int ret = read(conn->sock, conn->rbuf + conn->roff, BUF_SIZE - conn->roff);

if (ret > 0) {

conn->roff += ret;

int beg, end, len;

beg = end = 0;

while (beg < conn->roff) {

char *endPos = (char *)memchr(conn->rbuf + beg, '\n', conn->roff - beg);

if (!endPos) break;

end = endPos - conn->rbuf;

len = end - beg + 1;

/*echo*/

if (sendData(conn, conn->rbuf + beg, len) == -1) return -1;

beg = end + 1;

printf("request_finish_time=%ld\n", time(NULL));

}

int left = conn->roff - beg;

if (beg != 0 && left > 0) {

memmove(conn->rbuf, conn->rbuf + beg, left);

}

conn->roff = left;

} else if (ret == 0) {

return -1;

} else {

if (errno != EINTR && errno != EAGAIN) {

return -1;

}

}

return 0;

}

int handleWriteEvent(connection_t conn) {

if (conn->woff == 0) return 0;

int ret = write(conn->sock, conn->wbuf, conn->woff);

if (ret == -1) {

if (errno != EINTR && errno != EAGAIN) {

return -1;

}

} else {

int left = conn->woff - ret;

if (left > 0) {

memmove(conn->wbuf, conn->wbuf + ret, left);

}

conn->woff = left;

}

return 0;

}

void closeConnection(connection_t conn) {

struct epoll_event evReg;

conn->using = 0;

conn->woff = conn->roff = 0;

epoll_ctl(epfd[conn->index], EPOLL_CTL_DEL, conn->sock, &evReg);

close(conn->sock);

}

void *workerThread(void *arg) {

int epfd = *(int *)arg;

struct epoll_event event;

struct epoll_event evReg;

/* only handle connected socket */

while (!shut_server) {

int numEvents = epoll_wait(epfd, &event, 1, 1000);

if (numEvents > 0) {

int sock = event.data.fd;

connection_t conn = &g_conn_table[sock];

if (event.events & EPOLLOUT) {

if (handleWriteEvent(conn) == -1) {

closeConnection(conn);

continue;

}

}

if (event.events & EPOLLIN) {

if (handleReadEvent(conn) == -1) {

closeConnection(conn);

continue;

}

}

evReg.events = EPOLLIN | EPOLLONESHOT;

if (conn->woff > 0) evReg.events |= EPOLLOUT;

evReg.data.fd = sock;

epoll_ctl(epfd, EPOLL_CTL_MOD, conn->sock, &evReg);

}

}

return NULL;

}

void *listenThread(void *arg) {

int lisEpfd = epoll_create(5);

struct epoll_event evReg;

evReg.events  = EPOLLIN;

evReg.data.fd = lisSock;

epoll_ctl(lisEpfd, EPOLL_CTL_ADD, lisSock, &evReg);

struct epoll_event event;

int rrIndex = 0; /* round robin index */

/* only handle listen socekt */

while (!shut_server) {

int numEvent = epoll_wait(lisEpfd, &event, 1, 1000);

if (numEvent > 0) {

int sock = accept(lisSock, NULL, NULL);

if (sock > 0) {

g_conn_table[sock].using = 1;

int flag;

flag = fcntl(sock, F_GETFL);

fcntl(sock, F_SETFL, flag | O_NONBLOCK);

evReg.data.fd = sock;

evReg.events = EPOLLIN | EPOLLONESHOT;

/* register to worker-pool's epoll,

* not the listen epoll */

g_conn_table[sock].index= rrIndex;

epoll_ctl(epfd[rrIndex], EPOLL_CTL_ADD, sock, &evReg);

rrIndex = (rrIndex + 1) % EPOLL_NUM;

}

}

}

close(lisEpfd);

return NULL;

}

int main(int argc, char *const argv[]) {

int c;

for (c = 0; c < CONN_MAXFD; ++c) {

g_conn_table[c].sock = c;

}

struct sigaction act;

memset(&act, 0, sizeof(act));

act.sa_handler = shut_server_handler;

sigaction(SIGINT, &act, NULL);

sigaction(SIGTERM, &act, NULL);

/* create 2 different epoll fd */

lisSock = socket(AF_INET, SOCK_STREAM, 0);

int reuse = 1;

setsockopt(lisSock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));

int flag;

flag = fcntl(lisSock, F_GETFL);

fcntl(lisSock, F_SETFL, flag | O_NONBLOCK);

struct sockaddr_in lisAddr;

lisAddr.sin_family = AF_INET;

lisAddr.sin_port = htons(9876);

lisAddr.sin_addr.s_addr = htonl(INADDR_ANY);

if (bind(lisSock, (struct sockaddr *)&lisAddr, sizeof(lisAddr)) == -1) {

perror("bind");

return -1;

}

listen(lisSock, 4096);

pthread_t lisTid;

pthread_create(&lisTid, NULL, listenThread, NULL);

int epi;

for (epi = 0; epi < EPOLL_NUM; ++ epi) {

epfd[epi] = epoll_create(20);

}

int i;

cpu_set_t mask;

for (i = 0; i < EPOLL_NUM; ++i) {

int j;

for (j = 0; j < WORKER_PER_GROUP; ++j) {

pthread_create(worker + (i * WORKER_PER_GROUP + j), NULL, workerThread, epfd + i);

CPU_ZERO(&mask);

CPU_SET(i, &mask);

if (pthread_setaffinity_np(*(worker + (i * WORKER_PER_GROUP + j)), sizeof(mask), &mask) < 0)

{

fprintf(stderr, "set thread affinity failed\n");

}

}

}

for (i = 0; i < NUM_WORKER; ++i) {

pthread_join(worker[i], NULL);

}

pthread_join(lisTid, NULL);

struct epoll_event evReg;

for (c = 0; c < CONN_MAXFD; ++c) {

connection_t conn = g_conn_table + c;

if (conn->using) {

epoll_ctl(epfd[conn->index], EPOLL_CTL_DEL, conn->sock, &evReg);

close(conn->sock);

}

}

for (epi = 0; epi < EPOLL_NUM; ++epi) {

close(epfd[epi]);

}

close(lisSock);

return 0;

}

B:

#define _GNU_SOURCE

#include <stdio.h>

#include <stdlib.h>

#include <string.h>

#include <sys/types.h>

#include <sys/socket.h>

#include <netdb.h>

#include <unistd.h>

#include <fcntl.h>

#include <sys/epoll.h>

#include <error.h>

#include <errno.h>

#include <signal.h>

#include <sched.h>

#include "thread-pool.h"

#define MAXEVENTS 64

static int make_socket_non_blocking(int sfd)

{

int flags, s;

flags = fcntl(sfd, F_GETFL, 0);

if (-1==flags)

{

perror("fcntl");

return -1;

}

flags |= O_NONBLOCK;

s = fcntl(sfd, F_SETFL, flags);

if (-1==s)

{

perror("fcntl");

return -1;

}

return 0;

}

static int create_and_bind(char *port)

{

struct addrinfo hints;

struct addrinfo *result, *rp;

int s, sfd;

memset(&hints, 0, sizeof(struct addrinfo));

hints.ai_family = AF_UNSPEC;//return IPv4 and IPv6 choices

hints.ai_socktype = SOCK_STREAM;//we want a TCP socket

hints.ai_flags = AI_PASSIVE;//all interfaces

s = getaddrinfo(NULL, port, &hints, &result);

if (0!=s)

{

fprintf(stderr, "getaddrinfo:%s\n", gai_strerror(s));

return -1;

}

for(rp=result; NULL!=rp; rp=rp->ai_next)

{

sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);

if (-1==sfd)

{

continue;

}

s = bind(sfd, rp->ai_addr, rp->ai_addrlen);

if (0==s)

{

//we managed to bind successfully

break;

}

close(sfd);

}

if (NULL==rp)

{

fprintf(stderr, "could not bind");

return -1;

}

freeaddrinfo(result);

return sfd;

}

int run = 1;

void SignalHandler(int iSignNum)

{

printf("capture signal number:%d\n", iSignNum);

run = 0;

}

void *handler(void *arg)

{

int s;

int fd = *((int*)arg);

/*we have data on the fd waiting to be read. read and

display it. we must read whatever data is available

completely, as we are running in edge-triggered mode

and won't get notification again for the same data.*/

int done = 0;

while(1)

{

ssize_t count;

char buf[512];

count = read(fd, buf, sizeof(buf));

if (-1==count)

{

/*if errno==EAGAIN, that means we have read all

data. so go back to the main loop*/

if (errno==EAGAIN||errno==EWOULDBLOCK)

{

done = 1;

}

else

{

fprintf(stderr, "fd:%d ", fd);

perror("read client data");

}

break;

}

else if (0==count)

{

/*end of file. the remote has closed the connection*/

done = 1;

break;

}

//write the buffer to standard output

s = write(1, buf, count);

if (-1==s)

{

perror("write");

abort();

}

}

if (done)

{

write(fd, "fine, thank you", strlen("fine, thank you")+1);

printf("closed connection on descriptor %d\n", fd);

/*closing the descriptor will make epoll remove it

from the set of descriptors which are monitored.*/

close(fd);

}

}

int main(int argc, char *argv[])

{

int sfd, s;

int efd;

struct epoll_event event;

struct epoll_event *events;

if (2!=argc)

{

fprintf(stderr, "Usage:%s [port]\n", argv[0]);

exit(EXIT_FAILURE);

}

// init thread-pool

unsigned count = 1;

count = sysconf(_SC_NPROCESSORS_ONLN);

pool_init(count);

thread_pool *pool = (thread_pool*)pool_instance();

// wait thread to run

sleep(5);

// thread cpu affinity

cpu_set_t mask;

cpu_set_t get;

int thread_ccore = 0;

for(thread_ccore=0; thread_ccore<count; thread_ccore++)

{

CPU_ZERO(&mask);

CPU_SET(thread_ccore, &mask);

if (pthread_setaffinity_np(pool->threadid[thread_ccore], sizeof(mask), &mask) < 0)

{

fprintf(stderr, "set thread affinity failed\n");

}

CPU_ZERO(&get);

if (pthread_getaffinity_np(pool->threadid[thread_ccore], sizeof(get), &get) < 0)

{

fprintf(stderr, "get thread affinity failed\n");

}

if (CPU_ISSET(thread_ccore, &get))

{

printf("thread %ld is running in processor %d\n", pool->threadid[thread_ccore], thread_ccore);

}

}

// listen

sfd = create_and_bind(argv[1]);

if (-1==sfd)

{

abort();

}

s = make_socket_non_blocking(sfd);

if (-1==s)

{

abort();

}

s = listen(sfd, SOMAXCONN);

if (-1==s)

{

perror("listen");

abort();

}

efd = epoll_create1(0);

if (-1==efd)

{

perror("epoll_create");

abort();

}

event.data.fd = sfd;

event.events = EPOLLIN|EPOLLET;

s = epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &event);

if (-1==s)

{

perror("epoll_ctl");

abort();

}

//buffer where events are returned

events = calloc(MAXEVENTS, sizeof event);

//the event loop

while(1)

{

signal(SIGINT, SignalHandler);

if (!run)

{

break;

}

int n, i;

n = epoll_wait(efd, events, MAXEVENTS, -1);

for(i=0; i<n; i++)

{

if ((events[i].events&EPOLLERR)||

(events[i].events&EPOLLHUP)||

(!(events[i].events&EPOLLIN)))

{

/*an error has occured on this fd, or the socet is not

ready for reading (whe were we notified then?) */

fprintf(stderr, "epoll error\n");

close(events[i].data.fd);

continue;

}

else if (sfd!=events[i].data.fd)

{

pool_add_job(handler, (void*)&(events[i].data.fd));

}

else

{

/*we have a notification on the listening socket, which

means one or more incoming connections*/

while(1)

{

struct sockaddr in_addr;

socklen_t in_len;

int infd;

char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];

in_len = sizeof in_addr;

infd = accept(sfd, &in_addr, &in_len);

if (-1==infd)

{

if ((errno==EAGAIN)||

(errno==EWOULDBLOCK))

{

//we have processed all incoming connections

break;

}

else

{

perror("accept");

break;

}

}

s = getnameinfo(&in_addr, in_len, hbuf, sizeof hbuf,

sbuf, sizeof sbuf,

NI_NUMERICHOST|NI_NUMERICSERV);

if (0==s)

{

printf("accepted connection on descriptor %d"

"(host=%s, port=%s)\n", infd, hbuf, sbuf);

}

/*make the incoming socket non-blocking and add it to the

list of fds to monitor*/

s = make_socket_non_blocking(infd);

if (-1==s)

{

abort();

}

event.data.fd = infd;

event.events = EPOLLIN|EPOLLET|EPOLLONESHOT;

s = epoll_ctl(efd, EPOLL_CTL_ADD, infd, &event);

if (-1==s)

{

perror("epoll_ctl");

abort();

}

}

}//else

}

}

free(events);

close(sfd);

close(efd);

pool_destroy();

printf("process exit\n");

}

————————————————————————————————————————————————————————————————————

本机环境,CPU八核(虚拟机)

8 工作者 线程

A: 测试

//LoadRunner

#include "lrs.h"

Action()
{
    lrs_create_socket("socket0", "TCP", "RemoteHost=10.20.61.117:9876",  LrsLastArg);

lr_think_time(7);

lrs_send("socket0", "buf0", LrsLastArg);

lrs_receive("socket0", "buf1", LrsLastArg);

lrs_close_socket("socket0");

return 0;
}

linux tcp server

linux tcp server

B: 测试

#include "lrs.h"

Action()
{
    lrs_create_socket("socket0", "TCP", "RemoteHost=10.20.61.117:8093",  LrsLastArg);

lr_think_time(6);

lrs_send("socket0", "buf0", LrsLastArg);

lrs_receive("socket0", "buf1", LrsLastArg);

lrs_close_socket("socket0");

return 0;
}

linux tcp server

linux tcp server

 Finally:

A模式:多epoll, 多线程。accept后,将socket fd分配给各个epoll fd,各个线程epoll_wait各自的epoll fd,不设置锁。

以大多数开发者的想法,这种不设置锁的多线程应该高效。但其实不然!!!!

首先,这个模型里,各个线程没有休眠,再有,connnect结构占用内容偏高。

     结果,造成系统响应迟钝,退出缓慢,网络吞吐并不高。

linux tcp server

     linux tcp server

B模式:单epoll,启用工作者线程池

     大多数开发者看见了线程池有锁,就认为效率低下。其实不然!!!!

有人分析过,内核锁的效率不是应用效率的主障碍!!!!!!!

首先,这个模型里,cpu和内存占用极低,所有耗时都费在了应该费时的I/O上。

结果,系统响应极快,退出正常,网络吞吐是上个模型的2.5倍

linux tcp server

linux tcp server

有时候,新生事务是要比老事务先进的多的。因为A模型实在2013年提出的!!!