6.2 socket 流协议与粘包

时间:2022-07-20 06:40:06

  TCP IP协议是流协议,对上层协议来讲是没有边界的,主机A发送两个消息M1和M2,如下图所示:

6.2  socket  流协议与粘包

主机A发送了M1和M2,主机B在接收时有4种情况:

1、先收了M1,又收了M2

2、M1、M2一起收到了

3、M1和M2的一部分一起收到的,又收到了M2的一部分

4、先收到了M1的一部分,然后M1的下一部分和M2一起收到

说明:

  tcp字节流无边界

  udp消息是基于数据报的,是有边界的,可以不处理

  对等方一次读操作,不能保证完全把消息读完

  对方接收数据包的个数是不确定的

应用程序发数据时,先把数据写到socket的缓冲区里面,缓冲区的大小也是有规定的,当缓冲区写到一定程度,这时候TCP IP协议开始往对等方发数据。IP层有MSS最大数据报限制,如果数据包大于了MSS,则IP层会对数据分片,到对等方再进行组合。在链路层有MTU最大传输单元限制。

产生粘包的原因:

  1、套接字本身有缓冲区(发送缓冲区、接受缓冲区)

  2、tcp传送端的mss大小限制

  3、链路层的MTU限制,如果数据包大于MTU,则要在IP层进行分片,导致消息分割

  4、tcp的流量控制和拥塞控制,也可能导致粘包

  5、tcp延迟发送机制

我们前几篇博客中的read函数是有bug的,但是我们的实验都是在局域网(在一个机器上)进行的,包传输较快,所以没有凸显出来。也就是在局域网上传输较快,先发送的包也先接收到了,没有出现粘包的现象。但是在公网传输时,延迟较大,如果我们不对流式数据包不进行处理,这时可能就会出现我们上面说的粘包现象了。真正的商用软件一定会进行粘包处理。

  包之间没有边界,我们可以人为的造边界。

  目前有以下处理方法:

  1、在包之间加\r\n,ftp就是这样处理的。

  2、在包之间加自定义报文。例如,在报文头之前加4个字节,指示后面的报文大小。

  3、定长包

  4、更复杂的应用层协议

我们使用在包头加上四字节自定义报文的方式解决粘包问题,直接给出如下的程序:

服务器端:

 #include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <netinet/ip.h> /* superset of previous */ ssize_t readn(int fd, void *buf, size_t count)
{
size_t nleft = count;
ssize_t nread; char *bufp = (char*)buf; while(nleft > )
{
if( (nread = read(fd, bufp, nleft)) < )
{
if(errno == EINTR)
{
continue;
} return -;
}
else if(nread == )
{
return count - nleft;
} bufp += nread;
nleft -= nread;
} return count;
} ssize_t writen(int fd, const void *buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten; char *bufp = (char*)buf; while(nleft > )
{
if( (nwritten = write(fd, bufp, nleft)) < )
{
if(errno == EINTR)
{
continue;
} return -;
}
else if(nwritten == )
{
continue;
} bufp += nwritten;
nleft -= nwritten;
} return count;
} struct packet
{
int len;
char buf[];
}; int main()
{
int sockfd = ;
sockfd = socket(AF_INET, SOCK_STREAM, ); if(sockfd == -)
{
perror("socket error");
exit();
} struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons();
inet_aton("192.168.31.128", &addr.sin_addr);
//addr.sin_addr.s_addr = inet_addr("192.168.6.249");
//addr.sin_addr.s_addr = INADDR_ANY; int optval = ;
if( setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < )
{
perror("setsockopt error");
exit();
} if( bind(sockfd, (struct sockaddr *)&addr, sizeof(addr)) < )
{
perror("bind error");
exit();
} if(listen(sockfd, SOMAXCONN) < )
{
perror("listen error");
exit();
} struct sockaddr_in peeraddr;
socklen_t peerlen; pid_t pid;
int conn = ; while()
{
conn = accept(sockfd, (struct sockaddr *)&peeraddr, &peerlen);
if(conn == -)
{
perror("accept error");
exit();
} char *p = NULL;
int peerport = ;
p = inet_ntoa(peeraddr.sin_addr);
peerport = ntohs(peeraddr.sin_port); printf("peeraddr = %s\n peerport = %d\n", p, peerport); pid = fork();
if(pid == -)
{
perror("fork error");
exit();
} if(pid == )
{
struct packet recvbuf;
int n;
int ret = ; close(sockfd); while()
{
memset(&recvbuf, , sizeof(struct packet));
ret = readn(conn, &recvbuf.len, ); if(ret == -)
{
printf("client closed \n");
exit();
}
else if(ret < )
{
perror("read error");
break;
} n = ntohl(recvbuf.len);
ret = readn(conn, recvbuf.buf, n); if(ret == -)
{
perror("readn error");
exit();
}
else if(ret < n)
{
printf("client closed\n");
break;
} fputs(recvbuf.buf, stdout); writen(conn, &recvbuf, +n);
}
} close(conn); } close(conn);
close(sockfd); return ;
}

客户端:

 #include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <netinet/ip.h> /* superset of previous */ ssize_t readn(int fd, void *buf, size_t count)
{
size_t nleft = count;
ssize_t nread; char *bufp = (char*)buf; while(nleft > )
{
if( (nread = read(fd, bufp, nleft)) < )
{
if(errno == EINTR)
{
continue;
} return -;
}
else if(nread == )
{
return count - nleft;
} bufp += nread;
nleft -= nread;
} return count;
} ssize_t writen(int fd, const void *buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten; char *bufp = (char*)buf; while(nleft > )
{
if( (nwritten = write(fd, bufp, nleft)) < )
{
if(errno == EINTR)
{
continue;
} return -;
}
else if(nwritten == )
{
continue;
} bufp += nwritten;
nleft -= nwritten;
} return count;
} struct packet
{
int len;
char buf[];
}; int main()
{
int sockfd = ;
sockfd = socket(AF_INET, SOCK_STREAM, ); struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons();
inet_aton("192.168.31.128", &addr.sin_addr);
//addr.sin_addr.s_addr = inet_addr("192.168.31.128"); if( connect(sockfd, (struct sockaddr *)&addr, sizeof(addr)) == - )
{
perror("connect error");
exit();
} struct packet sendbuf;
struct packet recvbuf;
memset(&recvbuf, , sizeof(struct packet));
memset(&sendbuf, , sizeof(struct packet));
int ret = ;
int n = ;
while(fgets(sendbuf.buf, sizeof(sendbuf.buf), stdin) != NULL)
{
n = strlen(sendbuf.buf);
sendbuf.len = htonl(n); writen(sockfd, &sendbuf, +n); ret = readn(sockfd, &recvbuf.len, ); if(ret == -)
{
perror("readn error");
exit();
}
else if(ret < )
{
printf("server close\n");
} n = ntohl(recvbuf.len); ret = readn(sockfd, recvbuf.buf, n); if(ret == -)
{
perror("readn error");
exit();
}
else if(ret < n)
{
printf("client close\n");
break;
} fputs(recvbuf.buf, stdout);
memset(&recvbuf, , sizeof(struct packet));
memset(&sendbuf, , sizeof(struct packet)); } close(sockfd); return ;
}

最重要的就是readn和writen函数,readn先读取4字节,然后根据这四字节的内容确定继续读取后面数据的大小。writen是写4+n字节,n是真正有用的数据,4字节是包头,如果写的过程中不出错,则writen一定会将4+n字节写完。如果writen返回0,那么可能是真的没有写进去,也可能是对端已经关闭,这时候我们重写一次,如果是对端关闭,则这次写writen就会返回小于0的数了。返回小于零的数可能是由于对端关闭,也可能是被中断唤醒,所以我们要判断一下errno,如果是被中断的,则再次尝试写入,如果是对端关闭,则writen就直接出错返回了(返回-1)。

下面我们使用第二种解决方案,在数据包的后面加上'\n',这样的话接收端就要解析数据查找'\n',我们之前用的都是read读数据,如果解析时还用read的话,就要一个字节一个字节的读取并解析,需要多次调用read,效率很低,为了提高效率,这时候可以选择recv函数。原型如下:

ssize_t  recv(int s, void *buf, size_t len, int flags)

与read相比,recv函数只能用于套接字文件描述符。而且多了一个flags参数。flags常用的参数有以下两个:

MSG_OOB:带外数据,紧急指针

MSG_PEEK:数据包的“偷窥”,提前预读,当设置成“偷窥”模式时,可以判断数据包的长度和内容。相当于提前读缓冲区,但是并没有将数据清走。read函数读的时候也会清缓冲区。

用fgets读键盘,客户端从键盘输入数据时默认会带一个'\n',这是fgets自动加的。

示例程序如下:

服务器端:

 #include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <netinet/ip.h> /* superset of previous */ ssize_t readn(int fd, void *buf, size_t count)
{
size_t nleft = count;
ssize_t nread; char *bufp = (char*)buf; while(nleft > )
{
if( (nread = read(fd, bufp, nleft)) < )
{
if(errno == EINTR)
{
continue;
} return -;
}
else if(nread == )
{
return count - nleft;
} bufp += nread;
nleft -= nread;
} return count;
} ssize_t writen(int fd, const void *buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten; char *bufp = (char*)buf; while(nleft > )
{
if( (nwritten = write(fd, bufp, nleft)) < )
{
if(errno == EINTR)
{
continue;
} return -;
}
else if(nwritten == )
{
continue;
} bufp += nwritten;
nleft -= nwritten;
} return count;
} ssize_t recv_peek(int sockfd, void *buf, size_t len)
{
while()
{
int ret = recv(sockfd, buf, len, MSG_PEEK);
if(ret == - && errno == EINTR)
continue;
return ret;
}
} ssize_t readline(int sockfd, void *buf, size_t maxline)
{
int ret;
int nread;
char *bufp = (char*)buf;
int nleft = maxline; while()
{
ret = recv_peek(sockfd, bufp, nleft);
if(ret < )
{
return ret;
}
else if(ret == )
{
return ret;
} nread = ret;
int i;
for(i = ; i < nread; i++)
{
if(bufp[i] == '\n')
{
ret = readn(sockfd, bufp, i+);
if(ret != i+)
{
perror("readn error");
exit();
} return ret;
}
} if(nread > nleft)
{
perror("FAILURE");
exit();
} nleft -= nread;
ret = readn(sockfd, bufp, nread);
if(ret != nread)
{
perror("readn error");
exit();
}
bufp += nread;
} return -;
} int main()
{
int sockfd = ;
sockfd = socket(AF_INET, SOCK_STREAM, ); if(sockfd == -)
{
perror("socket error");
exit();
} struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons();
inet_aton("192.168.31.128", &addr.sin_addr);
//addr.sin_addr.s_addr = inet_addr("192.168.6.249");
//addr.sin_addr.s_addr = INADDR_ANY; int optval = ;
if( setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < )
{
perror("setsockopt error");
exit();
} if( bind(sockfd, (struct sockaddr *)&addr, sizeof(addr)) < )
{
perror("bind error");
exit();
} if(listen(sockfd, SOMAXCONN) < )
{
perror("listen error");
exit();
} struct sockaddr_in peeraddr;
socklen_t peerlen; pid_t pid;
int conn = ; while()
{
conn = accept(sockfd, (struct sockaddr *)&peeraddr, &peerlen);
if(conn == -)
{
perror("accept error");
exit();
} char *p = NULL;
int peerport = ;
p = inet_ntoa(peeraddr.sin_addr);
peerport = ntohs(peeraddr.sin_port); printf("peeraddr = %s\n peerport = %d\n", p, peerport); pid = fork();
if(pid == -)
{
perror("fork error");
exit();
} if(pid == )
{
char recvbuf[];
int ret = ; close(sockfd); while()
{
memset(&recvbuf, , sizeof(recvbuf));
ret = readline(conn, recvbuf, ); if(ret == )
{
printf("client closed \n");
break;
}
else if(ret == -)
{
perror("readline error");
break;
} fputs(recvbuf, stdout); writen(conn, recvbuf, strlen(recvbuf));
}
} close(conn); } close(conn);
close(sockfd); return ;
}

客户端:

 #include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <netinet/ip.h> /* superset of previous */ ssize_t readn(int fd, void *buf, size_t count)
{
size_t nleft = count;
ssize_t nread; char *bufp = (char*)buf; while(nleft > )
{
if( (nread = read(fd, bufp, nleft)) < )
{
if(errno == EINTR)
{
continue;
} return -;
}
else if(nread == )
{
return count - nleft;
} bufp += nread;
nleft -= nread;
} return count;
} ssize_t writen(int fd, const void *buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten; char *bufp = (char*)buf; while(nleft > )
{
if( (nwritten = write(fd, bufp, nleft)) < )
{
if(errno == EINTR)
{
continue;
} return -;
}
else if(nwritten == )
{
continue;
} bufp += nwritten;
nleft -= nwritten;
} return count;
} ssize_t recv_peek(int sockfd, void *buf, size_t len)
{
while()
{
int ret = recv(sockfd, buf, len, MSG_PEEK);
if(ret == - && errno == EINTR)
continue;
return ret;
}
} ssize_t readline(int sockfd, void *buf, size_t maxline)
{
int ret;
int nread;
char *bufp = (char*)buf;
int nleft = maxline; while()
{
ret = recv_peek(sockfd, bufp, nleft);
if(ret < )
{
return ret;
}
else if(ret == )
{
return ret;
} nread = ret;
int i;
for(i = ; i < nread; i++)
{
if(bufp[i] == '\n')
{
ret = readn(sockfd, bufp, i+);
if(ret != i+)
{
perror("readn error");
exit();
} return ret;
}
} if(nread > nleft)
{
perror("FAILURE");
exit();
} nleft -= nread;
ret = readn(sockfd, bufp, nread);
if(ret != nread)
{
perror("readn error");
exit();
}
bufp += nread;
} return -;
} int main()
{
int sockfd = ;
sockfd = socket(AF_INET, SOCK_STREAM, ); struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons();
inet_aton("192.168.31.128", &addr.sin_addr);
//addr.sin_addr.s_addr = inet_addr("192.168.31.128"); if( connect(sockfd, (struct sockaddr *)&addr, sizeof(addr)) == - )
{
perror("connect error");
exit();
} char sendbuf[] = {};
char recvbuf[] = {};
int ret = ;
int n = ;
while(fgets(sendbuf, sizeof(sendbuf), stdin) != NULL)
{
writen(sockfd, sendbuf, strlen(sendbuf)); ret = readline(sockfd, recvbuf, sizeof(recvbuf)); if(ret == -)
{
perror("readline error");
exit();
}
else if(ret == )
{
printf("server close\n");
break;
} fputs(recvbuf, stdout);
memset(recvbuf, , sizeof(recvbuf));
memset(sendbuf, , sizeof(sendbuf)); } close(sockfd); return ;
}

主要的函数就是readline函数,该函数从套接字缓冲区读取maxline长度的数据,readline调用了recv_peek,recv_peek返回实际读取的数据长度,然后在readline函数中判断这些数据中是否有'\n',如果有'\n'的话,就用readn函数真正的将数据读出来,然后直接返回。如果没有'\n',则会跳到120行判断一下recv_peek读取的长度,然后用readn将这些长度的数据读出来,然后移动缓冲区指针,接着再次调用recv_peek去缓冲区读数据,直到读到'\n'为止。或者读满整个maxline长度就返回。

执行结果如下:

6.2  socket  流协议与粘包