Socket层的核心是两个函数:sosend()
和soreceive()
.这两个函数负责处理所有Socket层和协议层之间的I/O操作。
select()
系统调用的作用是监控文件描述符的状态。一般用于Socket I/O操作,也可以用于其它文件I/O操作。
Socket缓存
我们知道,每一个Socket都有一个发送缓存和一个接收缓存。缓存的类型为sockbuf
。
struct sockbuf{
struct mbuf * sb_mb;//mbuf链,用于存储用户数据
u_long sb_mbcnt;//mbuf链的长度
u_long sb_mbmax;//mbuf链的最大长度。
u_long sb_cc;//缓存中的实际字节数
u_long sb_hiwat;
long sb_lowat;
struct selinfo sb_sel;
short sb_flags;
short sb_timeo;//read/write超时时间
}
-
sb_mb
为指向存储数据的mbuf链的指针,指向的是mbuf链的第一个mbuf -
sb_cc
的值等于存储在mbuf链中的数据字节数,即mbuf链中存储的有效数据的字节数 -
sb_hiwat
和sb_lowat
用来调整Socket的流控算法(不是TCP的流量控制) -
sb_mbcnt
为分配给缓存中的所有mbuf的存储器数量,即mbuf链的长度 -
sb_mbmax
为分配给该Socket mbuf缓存的存储器数量的上限,即mbuf链的最大长度。
默认的上限在socket系统调用中发送PRU_ATTACH
请求时由协议设置。只要内核要求的每个socket缓存的大小不超过262 144个字节的限制(sb_max,这是一个全局变量),进程就可以修改缓存的上限和下限。
Internet协议的默认的Socket缓存限制
因为每一个收到的UDP数据报的源地址和其携带的数据一起排队,所以UDP协议的sb_hiwat的默认值设置为能容纳40个1K字节长的数据报和相应的sockaddr_in结构(每个16个字节)。
- sb_sel是用来实现
select()
系统调用的selinfo结构 - sb_timeo用来限制一个进程在读写调用中被阻塞的时间。
下表列出了sb_flags的所有可能的值
sb-flags | 说明 |
---|---|
SB_LOCK | 一个进程已经锁定了socket缓存 |
SB_WANT | 一个进程正在等待给socket缓存加锁 |
SB_WAIT | 一个进程正在等待接收数据或发送数据所需的缓存 |
SB_SEL | 一个或多个进程正在选择这个缓存 |
SB_ASYNC | 为这个缓存产生异步I/O信号 |
SB_NOINTR | 信号不取消加锁请求 |
SB_NOTIFY | (SB_WAIT|SB_SEL|SB_ASYNC ) 一个进程正在等待缓存的变化,如果缓存发送任何变化,用wakeup通知该进程 |
write、writev、sendto和sendmsg系统调用
我们将write()
、writev
、sendto()
和sendmsg()
四个系统调用统称为"写系统调用",它们的作用是往网络连接上发送数据。相对于最一般的调用sendmsg()
而言,前三个系统调用是比较简单的接口。
所有的写系统调用都要直接或间接地调用sosend
。sosend
的功能是将进程来的数据复制到内核,并将数据传递给与socket相关的协议。
函数 | 描述符类型 | 缓存数量 | 是否指明目的地址 | 标志? | 控制信息? |
---|---|---|---|---|---|
write | 任何类型 | 1 | |||
writev | 任何类型 | [1..UIO_MAXIOV] |
|||
send | socket | 1 | . |
||
sendto | socket | 1 | . |
. |
|
sendmsg | socket | [1..UIO_MAXIOV] |
. |
. |
. |
write()
和writev()
系统调用适用于任何描述符,而其它的系统调用只适用于socket描述符。
writev()
和sendmsg()
系统调用可以接收从多个(应用层)缓存中来的数据。从多个缓存中写数据称为"收集(gathering)",同它相对应的读操作称为"分散(scattering)"。执行收集操作时,内核按序接收类型为iovec
的数据中指定的缓存中的数据。数组最多有UIO_MAXIOV
个单元。
struct iovec
{
char * iov_base;//基线地址,指向长度为iov_len个字节的缓存的开始
size_t iov_len;//长度
};
如果没有这种接口(writev),一个进程将不得不将多个缓存复制到一个大的缓存中,或调用多个写系统调用来发送多个缓存中的数据。对于数据报协议而言,调用一次write()
就是发送一个数据报,数据报的发送不能用多个写动作来实现。
数据报协议要求每一个写调用必须指定一个目的地址。因为write()
、writev()
和send()
调用接口不支持对目的地址的指定,因此这些调用只能在调用connect()
将目的地址同一个无连接的socket联系起来后才能被调用。调用sendto()
或sendmsg()
时必须提供目的地址,或在调用它们之前调用connect()
来指定目的地址。
sendmsg()系统调用
只有通过sendmsg()
系统调用才能访问到与socket API的输出有关的所有功能。sendmsg()
和sendit()
函数准备sosend()
系统调用所需的数据结构,然后由sosend()
系统调用将报文发送给相应的协议。
对于SOCK_DGRAM
协议而言,报文就是数据报。对于SOCK_STREAM
协议而言,报文是一串字节流。
sendmsg()
有三个参数:socket描述符、指向msghdr结构的指针、几个控制标志。函数copyin()
将msghdr
结构从用户空间复制到内核空间。
struct msghdr
{
caddr_t msg_name;//可选的目的地址
u_int msg_namelen;//msg_name的长度
struct iovec * msg_iov;//分散/收集数组
u_int msg_iovlen;//msg_iov数组长度
caddr_t msg_control;//控制信息
u_int msg_controllen;//控制信息长度
int msg_flags;//接收标志
};
控制信息(msg_control字段)的类型为cmsghdr
结构:
struct cmsghdr
{
u_int cmsg_len;
int cmsg_level;
int cmsg_type;
};
struct sendmsg_args
{
int s;
caddr_t msg;
int flags;
};
sendmsg(struct proc * p,struct sendmsg_args * uap,int * retval)
{
struct msghdr msg;
struct iovec aiov[UIO_SMALLOV],*iov;
int error;
/**
*一个有8个元素(UIO_SMALLIOV)的iovec数组从栈中自动分配。
*如果分配的数组不够大,sendmsg将调用MALLOC分配更大的数组。如果进程指定的数组单元大于1024(UIO_MAXIOV),则返回EMSGSIZE。
*copyin将iovec数组从用户空间复制到栈中的数组或一个更大的动态分配的数组中。
*这种技术避免了调用malloc带来的高代价,因为大多数情况下,数组的单元数小于等于8
*/
//将msg数据从用户空间复制到内核空间
if(error = copyin(uap->msg,(caddr_t)&msg,sizeof(msg))){
return (error);
}
if((u_int)msg.msg_iovlen >= UIO_SMALLIOV){
if((u_int)msg.msg_iovlen >= UIO_MAXIOV){
return (EMSGSIZE);
}
MALLOC(iov,struct iovec *,sizeof(struct iovec)*(u_int)msg.msg_iovlen,M_IOV,M_WAITOK);
}else{
iov = aiov;
}
if(msg.msg_iovlen && (error = copyin((caddr_t)msg.msg_iov,(caddr_t)iov,(unsigned)(msg.msg_iovlen * sizeof(struct iovec))))){
goto done;
}
msg.msg_iov = iov;
//如果sendit返回,则表明数据已经发送给相应的协议或出现差错
error = sendit(p,uap->s,&msg,uap->flags,retval);
done:
if(iov != aiov){
FREE(iov,M_IOV);
}
return (error);
}
sendit系统调用
sendit(struct proc * p,int s,struct msghdr * mp,int flags,int * retsize)
{
struct file * fp;
struct uio auio;
struct iovec * iov;
int i;
struct mbuf * to, * control;
int len,error;
if(error = getsock(p->p_fd,s,&fp)){
return error;
}
/**
*初始化uio结构,并将应用层的输出缓存中的数据收集到内核缓存中
*/
auio.uio_iov = mp->msg_iov;
auio.uio_iovcnt = mp->msg_iovlen;
auio.uio_segflg = UIO_USERSPACE;
auio.uio_rw = UIO_WRITE;
auio.uio_procp = p;
auio.uio_offset = 0;
auio.uio_resid = 0;
iov = mp->msg_iov;
/**
*将传送的数据的长度通过一个for循环来计算
*/
for(i = 0;i < mp->msg_iovlen;i++,iov++){
/**
*保证缓存的长度非负
*/
if(iov->iov_len < 0){
return (EINVAL);
}
/**
*保证uio_resid不溢出,因为uio_resid是一个**有符号的**整数,且iov_len要求非负
*/
if((auio.uio_resid += iov->iov_len) < 0){
return (EINVAL);
}
}
/**
*如果进程提供了地址和控制信息,则sockargs将地址和控制信息复制到内核缓存中
*/
if(mp->msg_name){//如果进程提供了地址
//将地址复制到内核中
if(error = sockargs(&to,mp->msg_name,mp->msg_namelen,MT_SONAME)){
return (error);
}
}else{
top = 0;
}
if(mp->msg_control){//如果进程提供了控制信息
if(mp->msg_controllen < sizeof(struct cmsghdr)){
error = EINVAL;
goto bad;;
}
//将控制信息复制到内核中
if(error = sockargs(&control,mp->msg_control,mp->msg_controllen,MT_CONTROL)){
goto bad;
}
}else{
control = 0;
}
/**
*发送数据和清除缓存
*/
len = auio.uio_resid;//为了防止sosend不接受所有数据而无法计算传送的字节数,将uio_resid的值保存在len中
//将socket、目的地址、uio结构(包含了要发送的数据)、控制信息和标志全部传给函数sosend
if(error = sosend((struct socket *)fp->f_data,to,&auio,(struct mbuf*)0,control,fkags)){
if(auio.uio_resid != len && (error == ERESTART || error == EINTR || error == EWOULDBLOCK)){
error = 0;
}
if(error == EPIPE){
psignal(p,SIGPIPE);
}
}
if(0 == error){
/**
*如果没有差错出现(或差错被丢弃),则计算传送的字节数,并将其保存在*retsize中。
*如果sendit返回0,syscall返回*retsize给进程而不是返回差错代码
*/
*retsize = len - auio.uio_resid;
}
bad:
//释放包含目的地址的缓存。
if(to){
m_freem(to);
}
return (error);
}
sosend系统调用
sosend()
是socket层中最复杂的函数之一。前面提到的所有五个写系统调用最终都要调用sosend()
。sosend()
的功能就是:根据socket指明的协议支持的语义和缓存的限制,将数据和控制信息传递给socket指明的协议的pr_usrreq函数.sosend从不将数据放在发送缓存(输出缓存)中,存储和移走数据应由协议来完成。
sosend()
对发送缓存的sb_hiwat
和sb_lowat
值的解释,取决于对应的协议是否实现可靠或不可靠的数据传送功能。
可靠的协议缓存
对于提供可靠的数据传输协议,例如TCP,发送缓存保存了还没有发送的数据和已经发送但还没有被确认的数据。sb_cc
等于发送缓存的数据的字节数,且0 <= sb_cc <= sb_hiwat
。
如果有带外数据发送,则sb_cc有可能暂时超过sb_hiwat
so_send应该确保在通过pr_usrreq()
函数将数据传递给协议层之前有足够的发送缓存。
协议层会将数据放到发送缓存中。sosend通过下面两种方式之一将数据传送给协议层:
TCP应用程序对外出的TCP报文段的大小没有控制。例如,在TCP Socket上发送一个长度为4096字节的报文,假定发送缓存中有足够的缓存,则Socket层将该报文分成两部分,每一部分长度为2048个字节,分别存放在一个带外部簇的mbuf中。然后,在协议处理时,TCP将根据连接上的MSS将数据分段,通常情况下,MSS为2048个字节。
当一个报文因为太大而没有足够的缓存时,协议允许报文被分成多段。但sosend仍然不将数据传送给协议层知道发送缓存中的空闲空间大小大于sb_lowat。对于TCP而言,sb_lowat的默认值是2048,从而阻止Socket层在发送缓存快满时用小块数据干扰TCP.
不可靠的协议缓存
对于提供不可靠的数据传输的协议而言,例如UDP,发送缓存不需要保存任何数据,也不等待任何确认。
每一个报文一旦被排队等待发送到相应的网络设备,Socket层立即将它传送到协议。在这种情况下,sb_cc总是等于0,sb_hiwat指定每一次写的最大长度,间接指明数据报的最大长度。
UDP协议的sb_hiwat
的默认值为9216(9 x 1024)。如果进程没有通过SO_SNDBUF
Socket选项改变sb_hiwat
的值,则发送长度大于9216个字节的数据报将导致差错。不仅如此,其它的协议限制也可能不允许一个进程发送大的数据报。
对于NFS写而言,9216已足够大,NFS写的数据加上协议首部的长度一般默认为8192个字节。
实现
sosend(struct socket * so,struct mbuf * addr,struct uio * uio,struct mbuf * top,struct mbuf * control,int flags)
{
/**
*初始化
*/
struct proc * p = curproc;
struct mbuf **mp;
struct mbuf * m;
long space,len,resid;
int clen = 0,error,s,dontroute,mlen;
/**
* 如果sosendallatonce等于true(任何设置了PR_ATOMIC的协议)或数据已经通过top中的mbuf链传送给sosend,则将设置atomic。这个标志[控制数据是作为一个mbuf链还是作为独立的mbuf传送给协议].
*/
int automic = sosendallatonce(so) || top;
/**
*resid等于iovec缓存中的数据字节数或top中的mbuf链中的数据字节数
*/
if(uio){
resid = uio->uio_resid;
}else{
resid = top->m_pkthdr.len;
}
if(resid < 0){
return (EINVAL);
}
/**
*如果仅仅要求对这个报文不通过路由表进行路由选择,则设置dontroute
*/
dontroute = (flags & MSG_DONTROUTE) && (so->so_options & SO_DONTROUTE) == 0 && (so->so_proto->pr_flags & PR_ATOMIC);
p->p_stats->p_ru.ru_msgsnd++;
if(control){
/**
* clen等于在可选的控制缓存中的字节数
*/
clen = control->m_len;
}
#define snderr(errno) {error = errno;splx(s);goto release;}
/**
*sosend的主循环从restart开始,在循环的开始调用sblock()给发送缓存加锁。通过加锁确保多个进程按序互斥访问socket缓存
*/
restart:
if(error = sblock(&so->so_snd,SBLOCKWAIT(flags)))
{
goto out;
}
/**
*主循环直到[将所有数据都传送给协议层](即resid = 0)时才会退出
*/
do{
/**
* 等待发送缓存有空闲空间
*/
s = splnet();
/**
* 如果Socket输出被禁止,即TCP连接的写通道已经关闭,则返回EPIPE
*/
if(so->so_state & SS_CANTSENDMORE){
snderr(EPIPE);
}
/**
* 如果Socket正处于差错状态(例如,前一个数据报可能已经产生了一个ICMP不可达的差错),则返回so->so_error
*/
if(so->so_error){
snderr(so->so_error);
}
/**
* 如果协议请求连接且连接还没有建立或连接请求还没有启动,则返回EMOTCONN.
* sosend允许只有控制信息但没有数据的写操作
*/
if((so->so_state & SS_ISCONNECTED) == 0){
if(so->so_proto->pr_flags & PR_CONNREQUIRED){
if((so->so_state & SS_ISCONFIRMING) == 0 && !(resid == 0 && clen != 0)){
snderr(ENOTCONN);
}
}else if(addr == 0){
snderr(EDESTADDRREQ);
}
}
/**
* 获取发送缓存中剩余的空闲空间字节数
* 这是一个基于缓存高水位标记的管理上的限制,但也是sb_mbmax对它的限制,其目的是[防止太多的小豹纹消耗太多的mbuf缓存]。sosend通过放宽缓存限制到1024个字节来给予带外数据更高的优先级
*/
space = sbspace(&so->so_snd);
if(flags & MSG_OOB){
space += 1024;
}
/**
* 如果atomic被置位,并且报文大于高水位标记(high-water mark),则返回EMSGSIZE;
* 报文因为太大而不被协议接受,即使缓存是空的。如果控制信息的长度大于高水位标记,同样返回EMSGSIZE
*/
if(atomic && resid > so->so_snd.sb_hiwat || clen > so->so_snd.sb_hiwat){
snderr(EMSGSIZE);
};
/**
* 如果发送缓存中的空间不够,数据来源于进程(而不是来源于内核中的top),并且下列条件之一成立,则sosend必须等待更多的空间
* 报文必须一次传送给协议(atomic为真);或
* 报文可以分段传送(即atomic为假),但闲置空间大小低于低水位标记;或
* 报文可以分段传送(即atomic为假),闲置空间大小大于低水位标记,但闲置空间存放不下控制信息
* 当数据通过top传送给sosend(即uio为NULL)时,数据已经在mbuf缓存中。因此sosend忽略缓存高、低水位标记限制,因为不需要附加的缓存来保存数据。
*/
if(space < resid + clen && uio && (atomic || space < so->so_snd.sb_lowat || space < clen)){
/**
* 如果sosend必须等待缓存且socket是非阻塞的,则返回EWOULDBLOCK
*/
if(so->so_state & SS_NBIO){
snderr(EWOULDBLOCK);
}
sbunlock(&so->so_snd);
/**
* sosend调用sbwait等待,直到发送缓存的状态发生变化.
* 当sbwait返回后,sosend重新进行协议处理,并且跳转到restart获取缓存锁,检查差错和缓存空间。如果条件满足,则继续执行。
* 默认情况下,sbwait阻塞直到可以发送数据。通过`SO_SNDTIMEO` Socket选项改变缓存中的sb_timeo,进程可以设置等待时间的上限
*/
error = sbwait(&so->so_snd);
splx(s);
if(error){
goto out;
}
goto restart;
}
splx(s);
mp = ⊤
space -= clen;//在sosend从进程复制任何数据之前,可用缓存的数量需减去控制信息的大小
do{
/**
*一旦有了足够的空间并且sosend也获得了发送缓存上的锁,则准备传送给协议层的数据。
*/
if(NULL == uio){
/**
*如果uio为空,则说明mbuf链中并没有数据或者调用出错
*/
resid = 0;
if(flags & MSG_EOR){
top->m_flags |= M_EOR;
}
}else{
do{
/**
*如果uio不为空,则sosend必须从进程间复制数据。当PR_ATOMIC被设置时(例如,UDP),循环继续,直到所有数据都被复制到一个mbuf链中。
*当sosend从进程得到所有数据后,通过循环中的break跳出循环。跳出循环后,sosend将整个数据链一次传送给相应协议
*/
if(0 == top){
MGETHDR(m,M_WAIT,MT_DATA);
mlen = MHLEN;
m->m_pkthdr.len = 0;
m->m_pkthdr.rcvif = (struct ifnet*)0;
}else{
MGET(m,M_WAIT,MT_DATA);
mlen = MLEN;
}
if(resid >= MINCLSIZE && space >= MCLBYTES){
MCLGET(m,M_WAIT);
if((m->m_flags & M_EXT) == 0){
goto nopages;
}
mlen = MCLBYTES;
if(atomic && top == 0){
len = min(MCLBYTES - max_hdr,resid);
m->m_data += max_hdr;
}else{
len = min(MCLBYTES,resid);
}
space -= MCLBYTES;
}else{
nopages:
len = min(min(mlen,resid),space);
space -= len;
/**
* For datagram protocols,level room for protocol headers in first mbuf
*/
if(atomic && top == 0 && len < mlen){
MH_ALIGN(m,len);
}
}
/**
* [使用uiomove从进程复制len个字节的数据到mbuf中]。传送完后更新mbuf的长度
*/
error = uiomove(mtod(m,caddr_t),(int)len,uio);
resid = uio->uio_resid;
m->m_len = len;
*mp = m;
top->m_pkthdr.len += len;
if(error){
goto release;
}
mp = &m->next;
if(resid <= 0){
if(flags & MSG_EOR){
top->m_flags != M_EOR;
}
break;
}
}while(space > 0 && automic);
/**
* pass mbuf chain to protocol
*/
if(dontroute){
so->so_options != SO_DONTROUTE;
}
s= splnet();
/**
* 向协议层发送PRU_SEND请求或PRU_SENDOOB,协议层收到请求后会发送数据
*/
error = (*so->so_proto->pr_usrreq)(so,(flags & MSG_OOB) ? PRU_SENDOOB : PRU_SEND,top,addr,control);
splx(s);
if(dontroute){
so->so_options &= ~SO_DONTROUTE;
}
clen = 0;
control = 0;
top = 0;
mp = ⊤
if(error){
goto release;
}
}
}while(resid && space > 0);
}while(resid);
release:
/**
* 当所有数据都传送给协议后,给Socket缓存解锁,释放多余的mbuf缓存
*/
sbunlock(&so->so_snd);
out:
if(top){
m_freem(top);
}
if(control){
m_freem(control);
}
return (error);
}
read、readv、recvfrom和recvmsg系统调用
我们将read()
、readv()
、recvfrom()
和recvmsg()
系统调用称为"读系统调用",从网络连接上接收数据。同recvmsg()
相比,前三个系统调用比较简单。recvmsg()
因为比较通用而复杂得多。
函数 | 描述符类型 | 缓存数量 | 返回发送者的地址吗? | 标志? | 返回控制信息? |
---|---|---|---|---|---|
read | 任何类型 | 1 | |||
readv | 任何类型 | [1..UIO_MAXIOV] |
|||
recv | socket | 1 | . |
||
recvfrom | socket | 1 | . |
. |
|
recvmsg | socket | [1..UIO_MAXIOV] |
. |
. |
. |
只有read()
和readv()
系统调用适用于各类描述符,其它的系统调用只适用于socket描述符。
同写调用一样,通过iovec
结构数组来指定多个缓存。
- 对数据报协议,
recvfrom()
和recvmsg()
返回每一个收到的数据报的源地址 - 对于面向连接的协议,
getpeername()
返回连接对方的地址
recvmsg()系统调用
recvmsg()
函数是最通用的读系统调用。如果一个进程使用任何一个其它的读系统调用,且地址、控制信息和接收标志的值还未定,则系统可能在没有任何通知的情况下丢弃它们。
struct recvmsg_args
{
int s;//socket描述符
struct msghdr * msg;
int flags;//控制标志
};
recvmsg(struct proc * p,struct recvmsg_args * uap,int * retval)
{
struct msghdr msg;
struct iovec aiov[UIO_SMALLIOV], *uiov, *iov;
int error;
/**
* 同sendmsg一样,recvmsg将msghdr结构复制到内核。
* 如果自动分配的数组aiov太小,则分配一个更大的iovec数组,并且将数组单元从进程复制到由iov只想的内核数组。
* 并将第三个参数复制到msghdr结构中
*/
if(error = copyin((caddr_t)uap->msg,(caddr_t)&msg,sizeof(msg))){
return (error);
}
if((u_int)msg.msg_iovlen >= UIO_SMALLIOV){
if((u_int)msg.msg_iovlen >= UIO_MAXIOV){
return (EMSGSIZE);
}
MALLOC(iov,struct iovec * ,sizeof(struct iovec) * (u_int)msg.msg_iovlen,M_IOV,M_WAITOK);
}else{
iov = aiov;
}
msg.msg_flags = uap->flags;
uiov = msg.msg_iov;
msg.msg_iov = iov;
if(error = copyin((caddr_t)uiov,(caddr_t)iov,(unsigned)(msg.msg_iovlen * sizeof(struct iovec)))){
goto done;
}
/**
* recvit收完数据后,将更新过的缓存长度和标志的msghdr结构再复制到进程。
*/
if((error = recvit(p,uap->s,&msg,(caddr_t)0,retval)) == 0){
msg.msg_iov = uiov;
error = copyout((caddr_t)&msg,(caddr_t)uap->msg,sizeof(msg));
}
done:
/**
* 如果分配了一个更大的iovec结构,则释放它
*/
if(iov != aiov)
FREE(iov,M_IOV);
return (error);
}
recvit函数
recvit()
函数被recv()
、recvfrom()
和recvmsg()
调用。
基于recv xxx
调用提供的msghdr
结构,recvit()
函数为soreceive()
的处理准备了一个uio
结构。
recvit(struct proc * p,int s,struct msghdr * mp,caddr_t namelen,int * retsize)
{
struct file * fp;
struct uio auio;
struct iovec * iov;
int i;
int len,error;
struct mbuf * from = 0,*control = 0;
if(error = getsock(p->p_fd,s,&fp)){
return (error);
}
auio.uio_iov = mp->msg_iov;
auio.uio_iovcnt = mp->msg_iovlen;
auio.uio_segflg = UIO_USERSPACE;
auio.uio_rw = UIO_READ;
auio.uio_procp = p;
auio.uio_offset = 0;
auio.uio_resid = 0;
iov = mp->msg_iov;
for(i = 0;i < mp->msg_iovlen;i++;iov++){
if(iov->iov_len < 0){
return (EINVAL);
}
if((auio.uio_resid += iov->iov_len) < 0){
return (EINVAL);
}
}
len = auio.uio_resid;
/**
* soreceive实现从socket缓存中接收数据的最复杂的功能。
* 传送的字节数保存在*retisze中,并返回给进程。
* 如果有些数据已经复制到进程后信号出现或阻塞出现(len不等于uio_resid),则忽略差错,并返回已经传送的字节
*/
if(error = soreceive((struct socket*)fp->f_data,&from,&auio,(struct mbuf**)0,mp->msg_control ? &control : (struct mbuf**)0,&mp->msg_flags)){
if(auio.uio_resid != len && (error == ERESTART || error == EINTR || error == EWOULDBLOCK)){
error = 0;
}
}
if(error){
goto out;
}
*retsize = len - auio.uio_resid;
/**
* 如果进程传入了一个存放地址或控制信息或者两者都有的缓存,则recvit将结果写入该缓存,并且根据soreceive返回的结果调整它们的长度。
* 如果缓存太小,则地址信息可能被截掉。如果进程在发送读请求之前保存缓存的长度,将该长度同内核返回的namelenp变量(或sockaddr结构的长度域)相比较就可能发现这个错误。通过设置msg_flags中的MSG_CTRUNC标志来报告这种差错。
*/
if(mp->msg_name){
len = mp->msg_namelen;
if(len <= 0 || from == 0){
len = 0;
}else{
if(len > from->m_len){
len = from->m_len;
}
if(error = copyout(mtod(from,caddr_t),(caddr_t)mp->msg_name,(unsigned)len)){
goto out;
}
mp->msg_namelen = len;
if(namelenp && (error = copyout((caddr_t)&len,namelenp,sizeof(int)))){
goto out;
}
}
if(mp->msg_control){
len = mp->msg_controllen;
if(len <= 0 || control == 0){
len = 0;
}else{
if(len >= control->m_len){
len = control->m_len;
}else{
mp->msg_flags |= MSG_CTRUNC;
}
error = copyout((caddr_t)mtod(control,caddr_t),(caddr_t)mp->msg_control,(unsigned)len);
}
mp->msg_controllen = len;
}
out:
/**
* 从out开始,释放存储源地址和控制信息的mbuf缓存
*/
if(from){
m_freem(from);
}
if(control){
m_freem(control);
}
return (error);
}
soreceive函数
soreceive()
函数将数据从socket的接收缓存传送到进程指定的缓存。某些协议还提供发送者的地址,地址可以同可能的附加控制信息一起返回。
在讨论它的代码之前,先来讨论接收操作,带外数据和socket接收缓存的组织的含义。
recv xxx
系统调用,传递给内核的标志值:
flags | 描述 |
---|---|
MSG_DONTWAIT |
在调用期间不等待资源 |
MSG_OOB |
接收带外数据而不是正常的数据 |
MSG_PEEK |
接收数据的副本而不取走数据 |
MSG_WAITALL |
在返回之前等待数据写缓存 |
recvmsg
是唯一返回标志字段给进程的读系统调用。在其它的系统调用中,控制返回给进程之前,这些信息被内核丢弃。
在msghdr中recvmsg能设置的标志
msg_flags | 描述 |
---|---|
MSG_CTRUNC |
控制信息的长度大于提供的缓存长度 |
MSG_EOR |
收到的数据标志一个逻辑记录的结束 |
MSG_OOB |
缓存中包含带外数据 |
MSG_TRUNC |
收到的报文的长度大于提供的缓存长度 |
其它的接收操作选项:
- 进程能够通过设置标志
MSG_PEEK
来查看是否有数据到达,而数据仍然留在接收队列中,被下一个不设置MSG_PEEK的读调用读出。 -
标志
MSG_WAITALL
指示读调用只有在读到指定数量的数据后才返回。即使soreceive()
中有一些数据可以返回给进程,但它仍然要等待收到剩余的数据后才返回。但是如果出现如下情况,即使没有读完指定长度的数据也会返回:- 连接的读通道被关闭
- socket的接收缓存小于所读数据的大小
- 在进程等待生于的数据时差错出现
- 带外数据到达
- 在读缓存被写满之前,一个逻辑记录的结尾出现
接收缓存的组织:报文边界
对于支持报文边界的协议,例如UDP,每一个报文存放在一个mbuf链中。
接收缓存中的多个报文通过m_nextpk
指针链接成一个mbuf
队列。协议处理层添加数据到接收队列,Socket层从接受队列中移走数据。接收缓存的高水位标记(so_hiwat)限制了存储在缓存中的数据量。
如果PR_ATOMIC
没有被置位,协议层尽可能多地在缓存中存放数据,丢弃输入数据中的不合要求的部分。对于TCP,这就意味着到达的任何数据如果在接收窗口之外都将被丢弃。
如果PR_ATOMIC
被置位,缓存必须能容纳整个报文,否则协议层将丢弃整个报文。对于UDP而言,如果接收缓存已满,则进入的数据报都将被丢弃,缓存满的原因可能是进程读数据报的速度不够快。
包含三个数据报的UDP接收缓存
对于PR_ATOMIC协议,当收到数据时,sb_lowat
被忽略。当没有设置PR_ATOMIC
时,sb_lowat的值等于读系统调用返回的最小字节数。
接收缓存的组织:没有报文边界
当协议不需要维护报文边界(即SOCK_STREAM
协议,如TCP)时,通过sbappend()
将进入的数据驾到缓存中的最后一个mbuf链的尾部。如果进入的数据长度大于缓存的长度,则数据将被截掉,sb_lowat
为一个读系统调用返回的字节树设置了一个下限。
实现
/**
* @param so: 指向socket
* @param paddr: 只想存放接收地址信息的mbuf缓存
* @param mp0: 如果mp0指向一个mbuf链,则soreceive将接收缓存中的数据传送到*mp0指向的mbuf缓存链.在这种情况下uio结构中只有用来记数的uio_resid字段是有意义的。如果mp0为空,则soreceive[将数据传送到uio结构指定的缓存]。
* @param control : 指向包含控制信息的mbuf缓存。
* @param flagsp : 存放标志
*
*/
soreceive(struct socket * so,struct mbuf ** paddr,struct uio * uio,struct mbuf **mp0,struct mbuf **controlp,int * flagsp)
{
struct mbuf *m,**mp;
int flags,len,error,s,offset;
//将pr指向socket协议的交换结构
struct protosw * pr = so->so_proto;
struct mbuf * nextrecord;
int moff,type;
/**
* 将uio_resid(接收请求的大小)保存在orig_resid
* 如果将控制或地址信息从内核复制到进程,则将orig_resid清0.soreceive函数的最后处理要利用这一事实。
*/
int orig_resid = uio->uio_resid;
mp = mp0;
if(paddr){
*paddr = 0;
}
if(controlp){
*controlp = 0;
}
if(flags){
flags = *flagsp & ~MSG_EOR;
}else{
flags = 0;
}
/**
* MSG_OOB processing and implicit connection confirmation
* 处理带外数据
*/
if(flags & MSG_OOB){
/**
* 因为OOB数据不存放在接收缓存中,所以soreceive为其分配一块标准的mbuf
*/
m = m_get(M_WAIT,MT_DATA);
/**
*给协议发送PRU_RCVOOB请求,并通过循环将协议返回的数据复制到uio指定的缓存中。复制完成后,soreceive返回0或差错代码
*/
error = (*pr->pr_usrreq)(so,PRU_RCVOOB,m,(struct mbuf*)(flags & MSG_PEEK),(struct mbuf*)0);//给协议发送PRU_RCVOOB请求,将带外数据复制到m中
if(error){
goto bad;
}
do{
//将m中的数据(即协议返回的带外数据)复制到uio指定的缓存中
error = uiomove(mtod(m,caddr_t),(int)min(uio->uio_resid,m->m_len),uio);
}while(uio->uio_resid && error == 0 && m);
bad:
if(m){
m_freem(m);
}
return (error);
}
/**
* 如果mp不为空,则表明将数据存放在其指向的mbuf链中。这里我们首先将mbuf链中的第一个mbuf清空
*/
if(mp){
*mp = (struct mbuf*)0;
}
/**
* 如果socket处于SO_ISCONFIRMING状态,PRU_RCVD请求告知协议进程想要接收数据
*/
if(so->so_state & SS_ISCONFORMING && uio->uio_resid){
(*pr->pr_usrreq)(so,PRU_RCVD,(struct mbuf*)0,(struct mbuf*)0,(struct mbuf*)0);
}
restart:
/**
* 在访问接收缓存之前,调用sblock给缓存加锁。如果flags中没有设置MSG_DONTWAIT标志,则soreceive必须等待加锁成功
*/
if(error = sblock(&so->so_rcv,SBLOCKWAIT(flags))){
return (error);
}
/**
* 挂起协议处理,使得在检查缓存过程中soreceive不被中断
*/
s = splnet();
/**
* m是接收缓存中的第一个mbuf链上的第一个mbuf
*/
m = so->so_rcv.sb_mb;
/**
* if necessary ,wait for data to arrive
* 读调用的请求能满足吗?如果不能则等待更多的数据
*/
if(m == 0 || ((flags & MSG_DONTWAIT) == 0 && so->so_rcv.sb_cc < uio->uio_resid) && (so->so_rcv.sb_cc < so->so_rcv.sb_lowat || ((flags & MSG_WAITALL) && uio->uio_resid <= so->so_rcv.sb_hiwat)) && m->m_nextpkt == 0 && (pr->pr_flags & PR_ATOMIC) == 0){
if(so->so_error){
if(m){
goto dontblock;
}
error = so->so_error;
if((flags & MSG_PEEK) == 0){
so->so_error = 0;
}
goto release;
}
if(so->so_state & SS_CANTRCVMORE){
if(m){
goto dontblock;
}else{
doto release;
}
}
for(;m;m->next){
if(m->m_type == MT_OOBDATA || (m->m_flags & M_EOR)){
m = so->so_rcv.sb_mb;
goto dontblock;
}
}
if((so->so_state & (SS_ISCONNECTED | SS_ISCONNECTING)) == 0 && (so->so_proto->pr_flags & PR_CONNREQUIRED)){
error = ENOTCONN;
goto release;
}
if(uio->uio_resid == 0){
goto release;
}
if((so->so_state & SS_NBIO) || (flags & MSG_DONTWAIT)){
error = EWOULDBLOCK;
goto release;
}
sbunlock(&so->so_rcv);
/**
* 等待更多的数据
* 同sosend中一样,进程能够利用SO_RCVTIMEO Socket选项为sbwait设置一个接收定时器。如果在数据到达之前定时器超时,则sbwait返回EWOULDBLOCK
*/
error = sbwait(&so->so_rcv);
splx(s);
if(error){
return (error);
}
goto restart;
}
dontblock:
if(uio->uio_procp){
uio->uio_procp->p_stats->p_ru.ru_msgrcv++;
}
/**
* nextrecord 指向接收缓存的下一条记录。
* 在soreceive的后面,当第一个链被丢弃后,该指针被用来将剩余的mbuf放入socket缓存
*/
nextrecord = m->m_nextpkt;
/**
* 处理地址和控制信息
*/
/**
* 如果协议提供地址信息,例如UDP,则将从mbuf链中删除含地址的mbuf,并通过*paddr返回。如果,paddr为空,则地址信息被丢弃
*/
if(pr->pr_flags & PR_ADDR){
orig_resid = 0;
if(flags & MSG_PEEK){//接收数据的副本而不取走
if(paddr){
*paddr = m_copy(m,0,m->m_len);
}
m = m->m->m_next;
}else{
sbfree(&so->so_rcv,m);
if(paddr){
*paddr = m;
so->so_rcv.sb_mb = m->m_next;
m->m_next = 0;
m = so->so_rcv.sb_mb;
}else{
MFREE(m,so->so_rcv.sb_mb);
m = so->so_rcv.sb_mb;
}
}
}
while(m && m->m_type == MT_CONTROL && error == 0){
if(flags & MSG_PEEK){
if(controlp){
*controlp = m_copy(m,0,m->m_len);
}
m = m->m_next;
}else{
sbfree(&so->so_rcv,m);
if(controlp){
if(pr->pr_domain->dom_externalize && mtod(m,struct cmsghdr *)->cmsg_type == SCM_RIGHTS){
error = (*pr->pr_domain->dom_externalize)(m);
}
*controlp = m;
so->so_rcv.sb_mb = m->m_next;
m->m_next = 0;
m = m->so_rcv.sb_mb;
}
}
if(controlp){
orig_resid = 0;
controlp = &(*controlp)->m_next;
}
}
/**
* 处理完所有的控制mbuf后,m指向链中的下一个mbuf.
* 如果在地址或控制信息的后面,链中没有其它的mbuf,则m为空
*
*/
if(m){
if((flags & MSG_PEEK) == 0){
m->m_nextpkt = nextrecord;
}
type = m->m_type;
if(MT_OOBDATA == type){
flags |= MSG_OOB;
}
}
/**
* 传送数据
* 当MSG_PEEK被置位时,将为传送的下一个字节的偏移位置
* 当MSG_PEEK被置位时,OOB标记的偏移位置
* uio_resid表示还未传送的字节数
* len表示从本mbuf中将要传送的字节数;如果uio_resid比较小或靠OOB标记比较近,则len可能小于m_len
*/
moff = 0;
offset = 0;
while(m && uio->uio_resid > 0 && error == 0){
if(m->m_type == MT_OOBDATA){
if(type != MT_OOBDATA){
break;
}
}else if(type == MT_OOBDATA){
break;
}
so->so_state &= ~SS_RCVATMARK;
len = uio->uio_resid;
if(so->so_oobmark && len > so->so_oobmark - offset){
len = so->so_oobmark - offset;
}
if(len > m->m_len - moff){
len = m->m_len - moff;
}
/**
* 如果将数据传送到uio缓存,则调用uiomove。如果数据是作为一个mbuf链返回的,则更新uio_resid的值,使其等于传送的字节数
*/
if(mp == 0){
splx(s);
//将数据从接收缓存传送到uio缓存中
error = uiomove(mtod(m,caddr_t)+moff,(int)len,uio);
s = splnet();
}else{
uio->uio_resid -= len;
}
/**
* 调整指针和偏移准备传送下一个mbuf
*/
if(len == m->m_len - moff){
if(m->m_flags & M_EOR){
flags != MSG_EOR;
}
if(flags & MSG_PEEK){
m = m->m_next;
moff = 0;
}else{
nextrecord = m->m_nextpkt;
sbfree(&so->so_rcv,m);
if(mp){
*mp = m;
mp = &m->m_next;
so->so_rcv.sb_mb = m = m->m_next;
*mp = (struct mbuf*)0;
}else{
MFREE(m,so->so_rcv.sb_mb);
m = so->so_rcv.sb_mb;
}
if(m){
m->m_nextpkt = nextrecord;
}
}else{
if(mp){
//将数据从接收缓存拷贝到mp指向的mbuf链
*mp = m_copy(m,0.len,M_WAIT);
}
m->m_data += len;
m_m_len -= len;
so->so_rcv.sb_cc -= len;
}
}
if(so->so_oobmark){
if((flags & MSG_PEEK) == 0){
so->so_oobmark -= len;
if(so->so_oobmark == 0){
so->so_state |= SS_RCVATMARK;
break;
}
}else{
offset += len;
if(offset == so->so_oobmark){
break;
}
}
}
if(flags & MSG_EOR){
BREAK;
}
/**
* 当设置了MSH_WAITALL标志,并且读请求还没有完成,则循环将等待更多的数据到达
* If the MSG_WAITALL flag is set (for non-atomic socket)
* we must not quit util "uio->uio_resid == 0"or an error termination.
* If a signal/timeout occurs ,return with a short count but without error
* Keep sockbuf locked against other readers
*/
while(flags & MSG_WAITALL && m == 0 && uio->uio_resid > - && !sosendallatonce(so) && !nextrecord){
if(so->so_error || so->so_state & SS_CANTRCVMORE){
break;
}
//等待
error = sbwait(&so->so_rcv);
if(error){
sbunlock(&so->so_rcv);
splx(s);
return (0);
}
if(m = so->so_rcv.sb_mb){
nextrecord = m->m_nextpkt;
}
}
}
/**
*cleanup
*/
if(m && pr->pr_flags & PR_ATOMIC){
flags != MSG_TRUNC;
if((flags & MSG_PEEK) == 0){
(void)sbdroprecord(&so->so_rcv);
}
}
if((flags & MSG_PEEK) == 0){
if(m == 0){
so->so_rcv.sb_mb = nextrecord;
}
if(pr->pr_flags & PR_WANTRCVD && so->so_pcb){
(*pr->pr_usrreq)(so,PRU_RCVD,(struct mbuf*)0,(struct mbuf*)flags ,(struct mbuf*)0,(struct mbuf *)0);
}
}
if(orig_resid == uio->uio_resid && orig_resid && (flags & MSG_EOR) == 0 && (so->so_state & SS_CANTRCVMORE) == 0){
sbunlock(&so->so_rcv);
splx(s);
goto restart;
}
if(flagsp){
*flagsp |- flags;
}
release:
sbunlock(&so->so_rcv);
splx(s);
return (error);
}
select系统调用
下表列出了能够监控的socket状态。
struct select_args{
u_int nd;
fd_set * in,*ou,*ex;
struct timeval * tv;
};
select(struct proc * p,struct select_args * uap,int * retval)
{
/**
* 在栈中分配两个数组:ibits和obits,每个数组有三个单元,每个单元为一个描述符集合。
*/
fd_set = ibits[3],obits[3];
struct timeval atv;
int s,ncoll,error = 0,timo;
/**
* ni等于用来存放nd个比特(1个描述符占1个比特)的比特掩码所需的字节数
*/
u_int ni;
bzero((caddr_t)ibits,sizeof(ibits));
bzero((caddr_t)obits,sizeof(obits));
/**
* select系统调用的第一个参数nd,必须不大于进程的描述符的最大数量
*/
if(uap->nd > FD_SETSIZE){
return (EINVAL);
}
/**
* 如果nd大于当前分配给进程的描述符个数,则将其减少到当前分配给进程的描述的个数
*/
if(uap->nd > p->p_fd->fd_nfiles){
uap->nd = p->p_fd->fd_nfiles;
}
/**
* ni等于用来存放nd个比特(1个描述符占1个比特)的比特掩码所需的字节数
*/
ni = howmany(uap->nd,NFDBITS) * sizeof(fd_mask);
/**
* getbits宏copyin从进程那里将文件描述符集合传递到ibits中的三个文件描述符集合.
* 如果文件描述符集合指针为空,则不需要复制
*/
#define getbits(name,x) \
if(uap->name && \
(error = copyin((caddr_t)uap->name,(caddr_t)&ibits[x],ni))) \
goto done;
}
getbits(in,0);
getbits(ou,1);
getbits(ex,2);
#undef getbits
/**
* 设置超时值
* 如果tv为空,则将timeo设置成0,select将无限期等待
*/
if(uap->tv){
/**
* 如果tv非空,则将超时值复制到内核
*/
error = copyin((caddr)t)uap->tv,(caddr_t)&atv,sizeof(atv));
if(error)[
goto done;
}
/**
* 调用itimerfix将超时值按硬件时钟的分辨率取整
*/
if(itimerfix(&atv)){
error = EINVAL;
goto done;
}
s = splclock();
/**
* 调用timevaladd将当前时间加到超时值中
*/
timevaladd(&atv,(struct timeval*)&time);
/**
* 调用hzto计算[从启动到超时之间]的时钟滴答数,并保存在timo中
*/
timo = hzto(&atv);
/**
* 如果计算出来的时钟滴答数为0,则将timo设置为1,从而防止select无限期等待,[实现利用全0的timeval结构来实现非阻塞操作]
* Avoid inadvertently sleeping forever
*/
if(0 == timo){
timo = 1;
}
splx(s);
}else{
timo = 0;//如果tv为空,则将timeo设置成0,select将无限期等待
}
retry:
/**
* 扫描进程指示的文件描述符,当一个或多个文件描述符处于就绪状态或者定时器超时或信号出现时返回
*/
ncoll = nselcoll;
p->p_flag |= P_SELECT;
error = selscan(p,ibits,obits,uap->nd,retval);
if(error || *retval){
goto done;
}
s = splhigh();
/* this should be timercmp(&time,&atv,>=) */
if(uap->tv && (time.tv_sec > atv.tv_sec || time.tv_sec == atv.tv_sec && time.tv_tv_usec >= atv.tv_usec)){
splx(s);
goto done;
}
if((p->p_flag & P_SELECT) == 0 || nselcoll != ncoll){
splx(s);
goto retry;
}
p->p_flag &= ~P_SELECT;
error = tsleep((caddr_t)&selwait,PSOCK|PCATCH,"select",timo);
splx(s);
if(0 == error){
goto retry;
}
done:
p->p_flag &= ~P_SELECT;
/* select is not restarted after signals ... */
if(ERESTART == error){
error = EINTR;
}
if(EWOULDBLOCK == error){
error = 0;
}
#define putbits(name,x) \
if(uap->name && \
(error2 = copyout((caddr_t)&bits[x],(caddr_t)uap->name,nil))){
error = error2;
}
if(0 == error){
int error2;
putbits(in,0);
putbits(ou,1);
putbits(ex,2);
}
#undef putbits
return (error);
selscan函数
select()
系统调用的核心是selscan()
函数.
对于任意一个文件描述符集合中设置的每一个比特,selscan
找出同它相关联的文件描述符,并且将控制分散给与描述符相关联的so_select()
函数。对于socket而言,就是soo_select()
函数
selscan(struct proc * p,fd_set * ibits,fd_set * obits,int nfd,int * retval)
{
struct filedesc * fdp = p->p_fd;
int j,fd;
fd_mask bits;
int n = 0;
static int flag[3] = {FREAD,FWRITE,0};
/**
* 第一个for循环依次查看三个文件描述符集合:读、写、例外
*/
for(int msk = 0;msk < 3;msk++){
/**
* 第二个for循环在每个文件描述符集合(读、写、例外)内部循环,这个循环在集合中每个32bit(NFDBITS)循环依次
*/
for(int i = 0;i < nfd;i+= NFDBITS){
bits = ibits[msk].fds_bits[i / NFDBITS];
while((j = ffs(bits)) && (fd = i + --j) < nfd){
bits &= ~(1 << j);
struct file *fp = fdp->fd_ofiles[fd];
if(NULL == fp){
return (EBADF);
}
/**
* 当发现某个文件描述符的状态为准备就绪时,设置输出文件描述符集合中相对应的比特位。并将n(状态就绪的描述符个数)加1
*/
if((*fp->f_ops->fo_select)(fp,flag[msk],p)){
FD_SET(fd,&obits[msk]);
n++;
}
}
}
}
&retval = n;
return (0);
}
soo_select函数
/**
* 出现如下情况是,可以认为socket可读:
* - 接收缓存中的数据高于低水位
* - 接收缓存中不会再有数据进入了(对方发送了FIN)
* - 处于监听状态的socket中有新的连接
* - socket出现差错
*/
#define soreadble(so) \
(so->so_rcv.sb_cc >= so->so_rcv.sb_lowat || so->so_state & SS_CANTRCVMORE || so->so_qlen || so->so_error)
/**
* 当出现如下情况是,可以认为socket可写:
* - 发送缓存中的可用空间大于低水位
*/
#define sowriteable(so) \
(sbspace(&so->so_snd) >= so_snd.sb_lowat && (so->so_state & SS_ISCONNECTED) || ((so->so_proto->pr_flags & PR_CONNREQURED) == 0 || so->so_state & SS_CANTSENDMORE || so->error))
soo_select(struct file * fp,int which,struct proc * p)
{
struct socket * so = (struct socket *)fp->f_data;
int s = splnet();
switch(which){
case FREAD:{
if(soreadable(so)){
splx(s);
return (1);
}
selrecord(p,&so->so_rcv.sb_sel);
so->so_rcv.sb_flags |= SB_SEL;
break;
}
case FWRITE:{
if(sowriteable(so)){
splx(s);
return (1);
}
selrecord(p,&so->so_snd.sb_sel);
so->so_snd.sb)flag |= SB_SEL;
break;
}
case 0:{
if(so->so_oobmark || (so->so_state & SS_RCVATMARK)){
splx(s);
return (1);
}
selrecord(p,&so->so_rcv.sb_sel);
so->so_rcv.sb_flags |= SB_SEL;
break;
}
}
splx(s);
return (0);
}