一次sendmsg的改造过程

时间:2023-03-09 03:37:14
一次sendmsg的改造过程

比较蛋疼的一个改造过程,简单记录一下。

场景:用户态使用sendmsg发包,tcp报文,由于内核实现过程中存在一次kernel_read,也就是存在将pagecache中的内容拷贝一次的问题。

为了减少这次拷贝,简单地将这个对pagecache的拷贝过程使用分散聚集io方式来进行map,map的数据来自于pagecahce中的文件。

这样就不存在拷贝了。

看过这部分代码的人肯定觉得既然不想拷贝,为啥不用sendfile来实现,关键是因为需要每个包有部分数据来自于用户态,比如rtp头,rtsp头等。

而媒体数据在内核态组装,如果是udp好办,我们做了一个solaris版本的sendfile,可以带一个用户态数据区过来组装报文,然后直接发送。

但是tcp的话,存在重发控制,简单通过qdisc流控又达不到df值的要求,所以就遗留了这么一个改动。

在改动过程,一开始的方案是参照tcp_sendpage来改造,但由于用户传入的

struct msghdr {
void * msg_name; /* Socket name */
int msg_namelen; /* Length of name */
struct iovec * msg_iov; /* Data blocks */
__kernel_size_t msg_iovlen; /* Number of blocks */
void * msg_control; /* Per protocol magic (eg BSD file descriptor passing) */
__kernel_size_t msg_controllen; /* Length of cmsg list */
unsigned msg_flags;
};

msg_iov 的个数比较多,也就是msg_iovlen比较多,(这个msg_iov里面不是直接要发送的业务数据,而是针对file的偏移和长度段),这样的话,调用tcp_sendpage就很频繁,

由于3.10版本的内核的tcp_sendpage如下:

int tcp_sendpage(struct sock *sk, struct page *page, int offset,
size_t size, int flags)
{
ssize_t res; if (!(sk->sk_route_caps & NETIF_F_SG) ||//网卡是否支持分散聚集io
!(sk->sk_route_caps & NETIF_F_ALL_CSUM))//网卡具备硬件执行校验和的标志
return sock_no_sendpage(sk->sk_socket, page, offset, size,
flags); lock_sock(sk);
res = do_tcp_sendpages(sk, &page, offset, size, flags);
release_sock(sk);
return res;
}

也就是说,每次操作一个很短的page都是在lock的情况下,这样锁操作多了,也间接影响了收包。

所以进一步参照 do_tcp_sendpages 的实现,自己申请了skb,挂载已经读取的poge段,放在分散聚集io的

skb_shinfo(skb)->frags 

数组中,而挂载的过程参照splice的实现。实现了免拷贝,以及减少了锁的申请,所以性能提升了20%左右。

另外一个有趣的发现是,由于接受方的0窗口,导致我们这边服务器发包的时候,没有窗口可以发送,间接影响了我们调用接口的时候的

static inline int sk_stream_memory_free(struct sock *sk)
{
return sk->sk_wmem_queued < sk->sk_sndbuf;
}

出现了一些tryagain。

出现tryagain的时候,我做了一个改动,就是增加sk ->sk_sndbuf的大小,当然这个值是小于系统配置的缓冲区大小,相当于代替用户调用了一次setsockopt,来扩展sendbuf。

另外测试发现,哪怕我sk_sndbuf不动,但是我释放锁再申请锁,都能大概率减少tryagain,一开始猜测认为是因为释放锁导致了我们的数据被ack了,所以我们的sk_wmem_queued减少了,

后来看到我们释放锁里面,其实做了很多工作:

void release_sock(struct sock *sk)
{
/*
* The sk_lock has mutex_unlock() semantics:
*/
mutex_release(&sk->sk_lock.dep_map, , _RET_IP_); spin_lock_bh(&sk->sk_lock.slock);
if (sk->sk_backlog.tail)
__release_sock(sk);--------------重点看这个 if (proto_has_rhel_ext(sk->sk_prot, RHEL_PROTO_HAS_RELEASE_CB) &&
sk->sk_prot->release_cb)
sk->sk_prot->release_cb(sk); sk->sk_lock.owned = ;
if (waitqueue_active(&sk->sk_lock.wq))
wake_up(&sk->sk_lock.wq);
spin_unlock_bh(&sk->sk_lock.slock);
}

可以看到,释放锁的时候,如果我们的sk_backlog中有报文的话,会调用__release_sock:

static void __release_sock(struct sock *sk)
{
struct sk_buff *skb = sk->sk_backlog.head; do {
sk->sk_backlog.head = sk->sk_backlog.tail = NULL;
bh_unlock_sock(sk); do {
struct sk_buff *next = skb->next; skb->next = NULL;
sk_backlog_rcv(sk, skb);-----------处理skb,针对tcp就是 tcp_v4_do_rcv /*
* We are in process context here with softirqs
* disabled, use cond_resched_softirq() to preempt.
* This is safe to do because we've taken the backlog
* queue private:
*/
cond_resched_softirq(); skb = next;
} while (skb != NULL); bh_lock_sock(sk);
} while ((skb = sk->sk_backlog.head) != NULL); /*
* Doing the zeroing here guarantee we can not loop forever
* while a wild producer attempts to flood us.
*/
sk_extended(sk)->sk_backlog.len = ;
}

可以看到,会有一个处理skb报文的过程,而这个过程,针对tcp来说,如下调用链:

tcp_v4_do_rcv--> 
tcp_rcv_established-->tcp_ack->tcp_clean_rtx_queue-->sk_wmem_free_skb

在tcp_ack中,一般做三件事:更新重传队列,更新发送窗口,从sack的信息或者重复ack来决定是否进入拥塞模式。

tcp_clean_rtx_queue中将会调用 sk_wmem_free_skb ,一般能将一些待确认的skb给释放掉,这样我们前面的对 sk_wmem_queued 和 sk->sk_sndbuf 的比较就可能为真,而不需要重试了。

ps:release_sock这个函数曾经还引起了一个比较有意思的讨论:
http://bbs.chinaunix.net/thread-4114007-1-6.html
顺带,看了一下tcp_ack在处理 tcp_clean_rtx_queue 的时候比较有意思的地方,
针对我们skb带了分散聚集io的情况,我们一个skb大概承载了40k左右的数据,而回复ack的时候,显然不会一个skb所有的报文发送出去,再收到ack,所以这个ack肯定会在skb的数据之间,
这种情况下:
static int tcp_clean_rtx_queue(struct sock *sk, int prior_fackets,
u32 prior_snd_una, long sack_rtt_us)
{
。。。。。 while ((skb = tcp_write_queue_head(sk)) && skb != tcp_send_head(sk)) {
struct tcp_skb_cb *scb = TCP_SKB_CB(skb);
u8 sacked = scb->sacked;
u32 acked_pcount; /* Determine how many packets and what bytes were acked, tso and else */
if (after(scb->end_seq, tp->snd_una)) {
if (tcp_skb_pcount(skb) == ||
!after(tp->snd_una, scb->seq))
break; acked_pcount = tcp_tso_acked(sk, skb);--------------部分数据被ack,会进入这个流程
if (!acked_pcount)
break; fully_acked = false;

我们来看ack的处理,首先,如果skb是当前的待发送的skb,则直接跳过,因为我既然这个skb没有发送,则不应该收到我这个skb对应的seq范围之内的任何ack,所以可以直接跳过。

其次,当前待发送的skb之前的skb的部分数据被ack的时候,假设这个ack跨越了两个skb,那么之前那个skb肯定要释放,对于剩余的ack数据,则会对该skb进行trim操作:

/* If we get here, the whole TSO packet has not been acked. */
static u32 tcp_tso_acked(struct sock *sk, struct sk_buff *skb)
{
struct tcp_sock *tp = tcp_sk(sk);
u32 packets_acked; BUG_ON(!after(TCP_SKB_CB(skb)->end_seq, tp->snd_una)); packets_acked = tcp_skb_pcount(skb);
if (tcp_trim_head(sk, skb, tp->snd_una - TCP_SKB_CB(skb)->seq))
return ;
packets_acked -= tcp_skb_pcount(skb); if (packets_acked) {
BUG_ON(tcp_skb_pcount(skb) == );
BUG_ON(!before(TCP_SKB_CB(skb)->seq, TCP_SKB_CB(skb)->end_seq));
} return packets_acked;
}

而tcp_trim_head其实是蛮重的,大家想想,当skb只有线性区的时候,很容易就将data指针移位,但是当我使用分散聚集io的时候,则需要根据ack值去找到在 shinfo->frags 的偏移,

然后将ack之前的数据给释放掉:

static void __pskb_trim_head(struct sk_buff *skb, int len)
{
struct skb_shared_info *shinfo;
int i, k, eat; eat = min_t(int, len, skb_headlen(skb));
if (eat) {
__skb_pull(skb, eat);
len -= eat;
if (!len)
return;
}
eat = len;
k = ;
shinfo = skb_shinfo(skb);
for (i = ; i < shinfo->nr_frags; i++) {
int size = skb_frag_size(&shinfo->frags[i]); if (size <= eat) {
skb_frag_unref(skb, i);
eat -= size;
} else {
shinfo->frags[k] = shinfo->frags[i];
if (eat) {
shinfo->frags[k].page_offset += eat;
skb_frag_size_sub(&shinfo->frags[k], eat);
eat = ;
}
k++;
}
}
shinfo->nr_frags = k; skb_reset_tail_pointer(skb);
skb->data_len -= len;
skb->len = skb->data_len;
}