前沿
网卡收包后如何触发用户态收包
往片回顾
上篇重点分析了epoll相关知识;其实就是3个函数,以及函数在内核态对应的实现
-
epoll_create(int size): 创建一个epoll文件;返回一个文件描述符,内核对应创建一个 struct eventpoll
-
epoll_ctl(int epfd, int op, int fd, struct epoll_event): 将一个fd添加/删除到一个eventpoll中;关键动作由 epoll_insert 完成
-
对传入的fd,在内核会创建一个对应的 struct epitem, 并将item插入到 eventpoll.rbr 红黑树当中
-
在设备驱动tcp_poll中调用poll_table.ep_ptable_queue_proc回调函数;生成一个对应的eppoll_entry,并将 eppoll_entry.wait塞到对应 sock.sk_wq 队列中
-
-
epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout):陷入内核监听,等待eventpoll中有事件发生,类似 do_poll
- 在 ep_scan_ready_list 函数中遍历 eventpoll.rdllist 中每个元素,并使用回调函数 ep_send_events_proc 对每个元素中的事件进行处理;
本篇主要内容
学习从网卡收并将数据传送到用户态流程
NAPI机制及非NAPI
-
非NAPI机制在报文数量多时有两个缺陷
-
频繁网卡硬件中断会导致CPU压力较大
-
softnet队列大小有限,硬件中断上报的报文数量如果软件没有及时处理,超过了队列大小限制,则后续的硬件中断都是无效的
-
-
NAPI会先禁止接收中断,并告诉网络子系统将以轮询方式快速收包
-
非NAPI: IRQ–>netif_rx–>enqueue_to_backlog–>skb加入 softnet_data.input_pkt_queue, back_log加入 softirq_action.poll_list–>软中断–>net_rx_action–>process_backlog–>__netif_receive_skb
-
NAPI: IRQ–>netif_rx–>enqueue_to_backlog–>napi_struct加入 softnet_data.poll_list–>软中断–>net_rx_action–>驱动poll方法–>napi_gro_receive–>netif_receive_skb–>__netif_receive_skb
唤醒流程
-
net_dev_init()中注册软中断
NET_RX_SOFTIRQ->net_rx_action(), NET_TX_SOFTIRQ->net_tx_action()
-
完成硬中断:网卡驱动收报->netif_rx()/napi_schedule()->netif_rx_internal()->enqueue_to_backlog(): 将数据包放入cpu backlog queue(softnet_data.input_pkt_queue)
-
其中会检测设备是否在正常工作netif_running(skb->dev),以及本CPU的 input_pkt_queue 队列是否已满
-
对NAPI设备的内核接口为 napi_schedule()
-
-
软中断后
net_rx_action
从CPU报文队列中将数据包转移到网络栈中
代码分析
非NAPI: netif_rx()->netif_rx_internal()->enqueue_to_backlog()完成硬件中断后收包动作
netif_rx_internal
static int netif_rx_internal(struct sk_buff *skb)
{
int ret;
/* 检查时间戳 */
net_timestamp_check(netdev_tstamp_prequeue, skb);
trace_netif_rx(skb);
#ifdef CONFIG_RPS
if (static_key_false(&rps_needed)) {
struct rps_dev_flow voidflow, *rflow = &voidflow;
int cpu;
/*禁用抢占*/
preempt_disable();
rcu_read_lock();
/* 根据五元组确定处理该报文的cpu编号,具体参见附录rps/rfs介绍 */
cpu = get_rps_cpu(skb->dev, skb, &rflow);
if (cpu < 0)
cpu = smp_processor_id();
/* 将数据放入CPU的 input_pkt_queue 全局队列 */
ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
rcu_read_unlock();
preempt_enable();
} else
#endif
{
unsigned int qtail;
ret = enqueue_to_backlog(skb, get_cpu(), &qtail);
put_cpu();
}
return ret;
}
enqueue_to_backlog
/*
* 1. 如果cpu的全局收包队列还没有满,则将skb挂到 input_pkt_queue 队列上
* 2. 如果本cpu的 softnet_data 已经被调度,则只需要完成挂接动作
* 3. 否则需要通过 ____napi_schedule(sd, &sd->backlog)添加调度, 即将设备挂到cpu的softnet_data.poll_list上
*/
static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
unsigned int *qtail)
{
struct softnet_data *sd;
unsigned long flags;
unsigned int qlen;
/* 获取cpu相关的 softnet_data 变量 */
sd = &per_cpu(softnet_data, cpu);
/* 关中断 */
local_irq_save(flags);
rps_lock(sd);
/* 检查网卡是否是正在工作状态 */
if (!netif_running(skb->dev))
goto drop;
qlen = skb_queue_len(&sd->input_pkt_queue);
/* 如果本CPU的收包队列 input_pkt_queue 还未填满;则可以处理本次中断,否则只能drop掉 */
if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {
if (qlen) {
enqueue:
/* 如果 input_pkt_queue 不为空,说明虚拟设备已经得到调度,此时仅仅把数据加入 input_pkt_queue 即可 */
__skb_queue_tail(&sd->input_pkt_queue, skb);
input_queue_tail_incr_save(sd, qtail);
rps_unlock(sd);
local_irq_restore(flags);
return NET_RX_SUCCESS;
}
/* Schedule NAPI for backlog device
* We can use non atomic operation since we own the queue lock
* 否则需要调度backlog(虚拟设备),然后再入队
* 如果 napi_struct.backlog.state 已经标记了 NAPI_STATE_SCHED, 则说明设备已经在调度
*/
if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
if (!rps_ipi_queued(sd))
/* 添加调度:
* 1. 将设备对应的 napi_schedule.poll_list 结构插入到 softnet_data.poll_list 链表尾部
* 2. 唤醒中断
* 3. NAPI、非NAPI最后都会将自己的 napi_schedule.poll_list 挂到 softnet_data.poll_list 上,方便在软中断统一处理
*/
____napi_schedule(sd, &sd->backlog);
}
goto enqueue;
}
drop:
sd->dropped++;
rps_unlock(sd);
local_irq_restore(flags);
atomic_long_inc(&skb->dev->rx_dropped);
kfree_skb(skb);
return NET_RX_DROP;
}
软中断后部分
硬中断将数据包挂到CPU全局队列后,会通过 __napi_schedule 触发软中断(NET_RX_SOFTIRQ),而对应的中断回调函数为 net_rx_action->napi_poll->驱动注册的poll->__netif_receive_skb 送到网络协议栈处理
net_rx_action
/*
* 遍历中断对应CPU的 softnet_data.poll_list 上的设备结构体,将设备上的数据包发到网络协议栈处理
* 1. 设置软中断一次最大处理数据量、时间
* 2. napi_poll: 逐个处理设备结构体,其中会使用驱动初始化时注册的回调收包 poll 函数,将数据包送到网络协议栈
* 3. 如果最后 poll_list 上还有设备没处理,则退出前再次触发软中断
*/
static void net_rx_action(struct softirq_action *h)
{
struct softnet_data *sd = this_cpu_ptr(&softnet_data);
/* 设置软中断一次允许的最大执行时间为2个jiffies */
unsigned long time_limit = jiffies + 2;
/* 设置软中断接收函数一次最多处理的报文个数为300 */
int budget = netdev_budget;
LIST_HEAD(list);
LIST_HEAD(repoll);
/* 关闭本地cpu的中断,下面判断list是否为空时防止硬中断抢占 */
local_irq_disable();
/* 将要轮询的设备链表转移到临时链表上 */
list_splice_init(&sd->poll_list, &list);
local_irq_enable();
/* 循环处理poll_list链表上的等待处理的napi */
for (;;) {
struct napi_struct *n;
/* 如果遍历完链表,则停止 */
if (list_empty(&list)) {
if (!sd_has_rps_ipi_waiting(sd) && list_empty(&repoll))
return;
break;
}
/* 获取链表中首个设备 */
n = list_first_entry(&list, struct napi_struct, poll_list);
/* 调用驱动初始化时通过 netif_napi_add 注册的回调收包 poll 函数;非NAPI为固定 process_backlog()
* 处理完一个设备上的报文则要记录处理数量
*/
budget -= napi_poll(n, &repoll);
/* 如果超出预设时间或者达到处理报文最大个数则停止处理 */
if (unlikely(budget <= 0 ||
time_after_eq(jiffies, time_limit))) {
sd->time_squeeze++;
break;
}
}
local_irq_disable();
list_splice_tail_init(&sd->poll_list, &list);
list_splice_tail(&repoll, &list);
list_splice(&list, &sd->poll_list);
/* 如果softnet_data.poll_list上还有未处理设备,则继续触发软中断 */
if (!list_empty(&sd->poll_list))
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
net_rps_action_and_irq_enable(sd);
}
process_backlog
非NAPI设备的设备轮询函数
/*
* __netif_receive_skb-->__netif_receive_skb_core: 逐个处理 napi_struct.input_pkt_queue 上的数据报文
*/
static int process_backlog(struct napi_struct *napi, int quota)
{
int work = 0;
/* 取得本地CPU上的 softnet_data 数据 */
struct softnet_data *sd = container_of(napi, struct softnet_data, backlog);
/* Check if we have pending ipi, its better to send them now,
* not waiting net_rx_action() end.
*/
if (sd_has_rps_ipi_waiting(sd)) {
local_irq_disable();
net_rps_action_and_irq_enable(sd);
}
napi->weight = weight_p;
local_irq_disable();
while (1) {
struct sk_buff *skb;
/* 从 softnet_data.process_queue 队列中取一个skb */
while ((skb = __skb_dequeue(&sd->process_queue))) {
rcu_read_lock();
local_irq_enable();
__netif_receive_skb(skb);
rcu_read_unlock();
local_irq_disable();
input_queue_head_incr(sd);
if (++work >= quota) {
local_irq_enable();
return work;
}
}
rps_lock(sd);
if (skb_queue_empty(&sd->input_pkt_queue)) {
/*
* Inline a custom version of __napi_complete().
* only current cpu owns and manipulates this napi,
* and NAPI_STATE_SCHED is the only possible flag set
* on backlog.
* We can use a plain write instead of clear_bit(),
* and we dont need an smp_mb() memory barrier.
* 如果 input_pkt_queue 为空,则说明所有数据包已经处理完了
* 去除 NAPI_STATE_SCHED 标志
*/
napi->state = 0;
rps_unlock(sd);
break;
}
/* 将 input_pkt_queue 中元素转移到 process_queue 上,并初始化 input_pkt_queue */
skb_queue_splice_tail_init(&sd->input_pkt_queue,
&sd->process_queue);
rps_unlock(sd);
}
local_irq_enable();
return work;
}
__netif_receive_skb_core
/* 协议栈入口函数:
* 1. 对skb中报文头元数据的调整
* 2. 如果是VLAN报文,循环把vlan头剥掉,如果是qinq场景,两个vlan都会剥离,即在调用deliver_skb向上层传递报文数据时不含vlan标签
* 3. ptype_all处理,例如抓包程序、raw socket等;ptype_base处理,交给协议栈处理,例如ip、arp、rarp等
* 4. 交给rx_handler处理,例如OVS、linux bridge等
*/
static int __netif_receive_skb_core(struct sk_buff *skb, bool pfmemalloc)
{
/* packet_type作为网络层的输入接口,每一个网络层协议都会有一个对应的结构体
* 作为链路层、网络层之间的桥梁
*/
struct packet_type *ptype, *pt_prev;
rx_handler_func_t *rx_handler;
struct net_device *orig_dev;
bool deliver_exact = false;
int ret = NET_RX_DROP;
__be16 type;
net_timestamp_check(!netdev_tstamp_prequeue, skb);
trace_netif_receive_skb(skb);
/* 记录原始收包网络设备 */
orig_dev = skb->dev;
/* 此时data指针指向IP层头;设置 skb.network_header */
skb_reset_network_header(skb);
if (!skb_transport_header_was_set(skb))
skb_reset_transport_header(skb);
skb_reset_mac_len(skb);/* 设置 mac_len 为以太网报文头部长度 */
pt_prev = NULL;
another_round:
/* 设置skb.skb_iif, 记录数据包收包网络设备的索引号 */
skb->skb_iif = skb->dev->ifindex;
/* 增加本CPU处理过的数据包个数 */
__this_cpu_inc(softnet_data.processed);
/* 如果报文带vlan, 在 eth_type_trans 中设置的 skb.protocal 为 eth_p_8021q, eth_p8021ad */
if (skb->protocol == cpu_to_be16(ETH_P_8021Q) ||
skb->protocol == cpu_to_be16(ETH_P_8021AD)) {
/* 剥离vlan标签 */
skb = skb_vlan_untag(skb);
if (unlikely(!skb))
goto out;
}
#ifdef CONFIG_NET_CLS_ACT
if (skb->tc_verd & TC_NCLS) {
skb->tc_verd = CLR_TC_NCLS(skb->tc_verd);
goto ncls;
}
#endif
if (pfmemalloc)
goto skip_taps;
/* 在net_dev_init中会初始化两种链表;dev_add_pack()将一个协议类型结构链入某一个链表
* ptype_all:当协议类型是ETH_P_ALL时,它将被链入 ptype_all 链表;该链表主要用于 sniffer 程序,接收所有NIC收到的包
* ptype_base:hash链表,用于各种协议;key为协议类型;例如ETH_P_IP则调用ip_rcv, ETH_P_IPV6则调用ipv6_rcv
*/
list_for_each_entry_rcu(ptype, &ptype_all, list) {
if (pt_prev)
ret = deliver_skb(skb, pt_prev, orig_dev);
pt_prev = ptype;
}
/* 如果设备上注册了 pytpye_all, 做相应处理 */
list_for_each_entry_rcu(ptype, &skb->dev->ptype_all, list) {
if (pt_prev)
ret = deliver_skb(skb, pt_prev, orig_dev);
pt_prev = ptype;
}
skip_taps:
#ifdef CONFIG_NET_CLS_ACT
if (static_key_false(&ingress_needed)) {
skb = handle_ing(skb, &pt_prev, &ret, orig_dev);
if (!skb)
goto out;
}
skb->tc_verd = 0;
ncls:
#endif
if (pfmemalloc && !skb_pfmemalloc_protocol(skb))
goto drop;
/* 判断是否为vlan报文,并且vlan_tci 的VLAN_TAG_PRESENT位为1(skb_vlan_untag中进行过设置) */
if (skb_vlan_tag_present(skb)) {
if (pt_prev) {
ret = deliver_skb(skb, pt_prev, orig_dev);
pt_prev = NULL;
}
if (vlan_do_receive(&skb))
goto another_round;
else if (unlikely(!skb))
goto out;
}
/* 如果rx_handler!=NULL,则为网桥设备;在设备初始化时在 netdev_rx_handler_register 注册
* 比如网桥设备的 br_handle_frame;bonding接口注册 bond_handle_frame
*/
rx_handler = rcu_dereference(skb->dev->rx_handler);
if (rx_handler) {
if (pt_prev) {
/* 如果pt_prev不为空,表示进行过ETH_P_ALL协议类型处理,执行刚刚链表的最后一个协议处理函数 */
ret = deliver_skb(skb, pt_prev, orig_dev);
pt_prev = NULL;
}
/* 根据网桥处理的返回值进行下一步处理 */
switch (rx_handler(&skb)) {
/* 桥已经处理过该数据包,该数据包会以其它的方式传送 */
case RX_HANDLER_CONSUMED:
ret = NET_RX_SUCCESS;
goto out;
/* 桥改变的数据包的 skb->dev, 需要another_round 进行再一次处理 */
case RX_HANDLER_ANOTHER:
goto another_round;
/* 数据包只会传送到注册为具体网络设备(ptype->dev == skb->dev)的协议处理例程 */
case RX_HANDLER_EXACT:
deliver_exact = true;
/* 正常传送 */
case RX_HANDLER_PASS:
break;
default:
BUG();
}
}
if (unlikely(skb_vlan_tag_present(skb))) {
if (skb_vlan_tag_get_id(skb))
skb->pkt_type = PACKET_OTHERHOST;
/* Note: we might in the future use prio bits
* and set skb->priority like in vlan_do_receive()
* For the time being, just ignore Priority Code Point
*/
skb->vlan_tci = 0;
}
/* 记录IP层协议类型 */
type = skb->protocol;
/* 若未设置精确传送,向未指定设备的协议处理历程传送一份数据 */
if (likely(!deliver_exact)) {
deliver_ptype_list_skb(skb, &pt_prev, orig_dev, type,
&ptype_base[ntohs(type) &
PTYPE_HASH_MASK]);
}
/* 交给与原设备的绑定的具体处理协议例程处理 */
deliver_ptype_list_skb(skb, &pt_prev, orig_dev, type,
&orig_dev->ptype_specific);
/* 如果skb的当前设备与原设备不同(进行过vlan处理或桥处理), 则交给绑定当前设备的具体处理协议函数处理 */
if (unlikely(skb->dev != orig_dev)) {
deliver_ptype_list_skb(skb, &pt_prev, orig_dev, type,
&skb->dev->ptype_specific);
}
/* 如果 pt_prev 不为空,表明上面链表处理过程中还留下最后一个协议处理函数还未执行
* 此时将这个协议处理函数传出到外层函数 __netif_receive_skb_one_core 调用 pt_prev->func处理
* 外层函数处理时就不需要deliver_skb来增加skb->users,减少了一次skb的释放
*/
if (pt_prev) {
if (unlikely(skb_orphan_frags(skb, GFP_ATOMIC)))
goto drop;
else
ret = pt_prev->func(skb, skb->dev, pt_prev, orig_dev);
} else {
drop:
atomic_long_inc(&skb->dev->rx_dropped);
kfree_skb(skb);
/* Jamal, now you will not able to escape explaining
* me how you were going to use this. :-)
*/
ret = NET_RX_DROP;
}
out:
return ret;
}
ip_rcv
/*
* Main IP Receive routine.
*/
int ip_rcv(struct sk_buff *skb, struct net_device *dev, struct packet_type *pt, struct net_device *orig_dev)
{
const struct iphdr *iph;
u32 len;
/* PACKET_OTHERHOST: 表示非去往本机但是在特定模式下被接受的报文 */
if (skb->pkt_type == PACKET_OTHERHOST)
goto drop;
/* 内核数据统计 */
IP_UPD_PO_STATS_BH(dev_net(dev), IPSTATS_MIB_IN, skb->len);
/* 如果报文是共享的,则复制一个出来,此时复制出的 sk_buff 已经和socket脱离了关系
* 如果嗅探器或者其它的用户对数据包需要进行处理;则在调用ip_rcv之前,netif_receive_skb 会增加skb的引用计数,既应用计数>1
*/
skb = skb_share_check(skb, GFP_ATOMIC);
if (!skb) {
IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_INDISCARDS);
goto out;
}
/* pskb_may_pull 确保 skb->data 指向的内存包含的数据至少为IP头部大小 */
if (!pskb_may_pull(skb, sizeof(struct iphdr)))
goto inhdr_error;
/* pskb_may_pull可能会调整skb中的指针,所以需要重新定义IP头部 */
iph = ip_hdr(skb);
/*
* RFC1122: 3.2.1.2 MUST silently discard any IP frame that fails the checksum.
*
* Is the datagram acceptable?
*
* 1. Length at least the size of an ip header
* 2. Version of 4
* 3. Checksums correctly. [Speed optimisation for later, skip loopback checksums]
* 4. Doesn't have a bogus length
*/
/* 检查ip header */
if (iph->ihl < 5 || iph->version != 4)
goto inhdr_error;
BUILD_BUG_ON(IPSTATS_MIB_ECT1PKTS != IPSTATS_MIB_NOECTPKTS + INET_ECN_ECT_1);
BUILD_BUG_ON(IPSTATS_MIB_ECT0PKTS != IPSTATS_MIB_NOECTPKTS + INET_ECN_ECT_0);
BUILD_BUG_ON(IPSTATS_MIB_CEPKTS != IPSTATS_MIB_NOECTPKTS + INET_ECN_CE);
IP_ADD_STATS_BH(dev_net(dev),
IPSTATS_MIB_NOECTPKTS + (iph->tos & INET_ECN_MASK),
max_t(unsigned short, 1, skb_shinfo(skb)->gso_segs));
/* 确保IP完整的头部包括选项在内存中 */
if (!pskb_may_pull(skb, iph->ihl*4))
goto inhdr_error;
iph = ip_hdr(skb);
/* 校验IP头部的校验和 */
if (unlikely(ip_fast_csum((u8 *)iph, iph->ihl)))
goto csum_error;
/* 获取IP数据包总长度 */
len = ntohs(iph->tot_len);
/* 确保skb的数据长度 >= ip数据包总长度 */
if (skb->len < len) {
IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_INTRUNCATEDPKTS);
goto drop;
} else if (len < (iph->ihl*4))
goto inhdr_error;
/* Our transport medium may have padded the buffer out. Now we know it
* is IP we can trim to the true length of the frame.
* Note this now means skb->len holds ntohs(iph->tot_len).
*/
if (pskb_trim_rcsum(skb, len)) {
IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_INDISCARDS);
goto drop;
}
skb->transport_header = skb->network_header + iph->ihl*4;
/* Remove any debris in the socket control block */
memset(IPCB(skb), 0, sizeof(struct inet_skb_parm));
/* Must drop socket now because of tproxy. */
skb_orphan(skb);
/* 忽略与netfilter子系统的交互,调用为ip_rcv_finish(skb) */
return NF_HOOK(NFPROTO_IPV4, NF_INET_PRE_ROUTING, NULL, skb,
dev, NULL,
ip_rcv_finish);
csum_error:
IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_CSUMERRORS);
inhdr_error:
IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_INHDRERRORS);
drop:
kfree_skb(skb);
out:
return NET_RX_DROP;
}
附录
napi_struct
/*
* Structure for NAPI scheduling similar to tasklet but with weighting
* 每个NAPI设备都会对应一个结构体,通过 napi_struct.poll_list 链接到CPU的 softnet_data.poll_list
* 非NAPI没有对应的 napi_struct 结构;为了使用NAPI的处理流程,所有非NAPI设备共用softnet_data.back_log.poll_list作为一个虚拟设备添加到轮询链表
*/
struct napi_struct {
/* 用于加入处于轮询状态的设备列表 */
struct list_head poll_list;
/* 设备状态 */
unsigned long state;
int weight;/* 每次处理的最大数量,非NAPI默认为64 */
unsigned int gro_count;
/* 设备的轮询方法,非NAPI固定为 process_backlog() */
int (*poll)(struct napi_struct *, int);
#ifdef CONFIG_NETPOLL
spinlock_t poll_lock;
int poll_owner;
#endif
struct net_device *dev;
struct sk_buff *gro_list;
struct sk_buff *skb;
struct hrtimer timer;
struct list_head dev_list;
struct hlist_node napi_hash_node;
unsigned int napi_id;
};