Linux网络协议源码分析(六):网卡收包流程

前沿

网卡收包后如何触发用户态收包

往片回顾

上篇重点分析了epoll相关知识;其实就是3个函数,以及函数在内核态对应的实现

  • epoll_create(int size): 创建一个epoll文件;返回一个文件描述符,内核对应创建一个 struct eventpoll

  • epoll_ctl(int epfd, int op, int fd, struct epoll_event): 将一个fd添加/删除到一个eventpoll中;关键动作由 epoll_insert 完成

    1. 对传入的fd,在内核会创建一个对应的 struct epitem, 并将item插入到 eventpoll.rbr 红黑树当中

    2. 在设备驱动tcp_poll中调用poll_table.ep_ptable_queue_proc回调函数;生成一个对应的eppoll_entry,并将 eppoll_entry.wait塞到对应 sock.sk_wq 队列中

  • epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout):陷入内核监听,等待eventpoll中有事件发生,类似 do_poll

    1. 在 ep_scan_ready_list 函数中遍历 eventpoll.rdllist 中每个元素,并使用回调函数 ep_send_events_proc 对每个元素中的事件进行处理;

本篇主要内容

学习从网卡收并将数据传送到用户态流程

NAPI机制及非NAPI

  • 非NAPI机制在报文数量多时有两个缺陷

    1. 频繁网卡硬件中断会导致CPU压力较大

    2. softnet队列大小有限,硬件中断上报的报文数量如果软件没有及时处理,超过了队列大小限制,则后续的硬件中断都是无效的

  • NAPI会先禁止接收中断,并告诉网络子系统将以轮询方式快速收包

  • 非NAPI: IRQ–>netif_rx–>enqueue_to_backlog–>skb加入 softnet_data.input_pkt_queue, back_log加入 softirq_action.poll_list–>软中断–>net_rx_action–>process_backlog–>__netif_receive_skb

  • NAPI: IRQ–>netif_rx–>enqueue_to_backlog–>napi_struct加入 softnet_data.poll_list–>软中断–>net_rx_action–>驱动poll方法–>napi_gro_receive–>netif_receive_skb–>__netif_receive_skb

唤醒流程

  1. net_dev_init()中注册软中断 NET_RX_SOFTIRQ->net_rx_action(), NET_TX_SOFTIRQ->net_tx_action()

  2. 完成硬中断:网卡驱动收报->netif_rx()/napi_schedule()->netif_rx_internal()->enqueue_to_backlog(): 将数据包放入cpu backlog queue(softnet_data.input_pkt_queue)

    • 其中会检测设备是否在正常工作netif_running(skb->dev),以及本CPU的 input_pkt_queue 队列是否已满

    • 对NAPI设备的内核接口为 napi_schedule()

  3. 软中断后net_rx_action从CPU报文队列中将数据包转移到网络栈中

代码分析

非NAPI: netif_rx()->netif_rx_internal()->enqueue_to_backlog()完成硬件中断后收包动作

netif_rx_internal

static int netif_rx_internal(struct sk_buff *skb)
{
	int ret;
	/* 检查时间戳 */
	net_timestamp_check(netdev_tstamp_prequeue, skb);

	trace_netif_rx(skb);
#ifdef CONFIG_RPS
	if (static_key_false(&rps_needed)) {
		struct rps_dev_flow voidflow, *rflow = &voidflow;
		int cpu;
		/*禁用抢占*/
		preempt_disable();
		rcu_read_lock();
		/* 根据五元组确定处理该报文的cpu编号,具体参见附录rps/rfs介绍 */
		cpu = get_rps_cpu(skb->dev, skb, &rflow);
		if (cpu < 0)
			cpu = smp_processor_id();
		/* 将数据放入CPU的 input_pkt_queue 全局队列 */
		ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);

		rcu_read_unlock();
		preempt_enable();
	} else
#endif
	{
		unsigned int qtail;
		ret = enqueue_to_backlog(skb, get_cpu(), &qtail);
		put_cpu();
	}
	return ret;
}

enqueue_to_backlog

/*
 * 1. 如果cpu的全局收包队列还没有满,则将skb挂到 input_pkt_queue 队列上
 * 2. 如果本cpu的 softnet_data 已经被调度,则只需要完成挂接动作
 * 3. 否则需要通过 ____napi_schedule(sd, &sd->backlog)添加调度, 即将设备挂到cpu的softnet_data.poll_list上
 */
static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
			      unsigned int *qtail)
{
	struct softnet_data *sd;
	unsigned long flags;
	unsigned int qlen;
	/* 获取cpu相关的 softnet_data 变量 */
	sd = &per_cpu(softnet_data, cpu);
	/* 关中断 */
	local_irq_save(flags);

	rps_lock(sd);
	/* 检查网卡是否是正在工作状态 */
	if (!netif_running(skb->dev))
		goto drop;
	qlen = skb_queue_len(&sd->input_pkt_queue);
	/* 如果本CPU的收包队列 input_pkt_queue 还未填满;则可以处理本次中断,否则只能drop掉 */
	if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {
		if (qlen) {
enqueue:
	/* 如果 input_pkt_queue 不为空,说明虚拟设备已经得到调度,此时仅仅把数据加入 input_pkt_queue 即可 */
			__skb_queue_tail(&sd->input_pkt_queue, skb);
			input_queue_tail_incr_save(sd, qtail);
			rps_unlock(sd);
			local_irq_restore(flags);
			return NET_RX_SUCCESS;
		}

		/* Schedule NAPI for backlog device
		 * We can use non atomic operation since we own the queue lock
		 * 否则需要调度backlog(虚拟设备),然后再入队
		 * 如果 napi_struct.backlog.state 已经标记了 NAPI_STATE_SCHED, 则说明设备已经在调度
		 */
		if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
			if (!rps_ipi_queued(sd))
			/* 添加调度:
			 * 1. 将设备对应的 napi_schedule.poll_list 结构插入到 softnet_data.poll_list 链表尾部
			 * 2. 唤醒中断
			 * 3. NAPI、非NAPI最后都会将自己的 napi_schedule.poll_list 挂到 softnet_data.poll_list 上,方便在软中断统一处理
			 */
				____napi_schedule(sd, &sd->backlog);
		}
		goto enqueue;
	}

drop:
	sd->dropped++;
	rps_unlock(sd);

	local_irq_restore(flags);

	atomic_long_inc(&skb->dev->rx_dropped);
	kfree_skb(skb);
	return NET_RX_DROP;
}

软中断后部分

硬中断将数据包挂到CPU全局队列后,会通过 __napi_schedule 触发软中断(NET_RX_SOFTIRQ),而对应的中断回调函数为 net_rx_action->napi_poll->驱动注册的poll->__netif_receive_skb 送到网络协议栈处理

net_rx_action

/*
 * 遍历中断对应CPU的 softnet_data.poll_list 上的设备结构体,将设备上的数据包发到网络协议栈处理
 * 1. 设置软中断一次最大处理数据量、时间
 * 2. napi_poll: 逐个处理设备结构体,其中会使用驱动初始化时注册的回调收包 poll 函数,将数据包送到网络协议栈
 * 3. 如果最后 poll_list 上还有设备没处理,则退出前再次触发软中断
 */
static void net_rx_action(struct softirq_action *h)
{
	struct softnet_data *sd = this_cpu_ptr(&softnet_data);
	/* 设置软中断一次允许的最大执行时间为2个jiffies */
	unsigned long time_limit = jiffies + 2;
	/* 设置软中断接收函数一次最多处理的报文个数为300 */
	int budget = netdev_budget;
	LIST_HEAD(list);
	LIST_HEAD(repoll);
	/* 关闭本地cpu的中断,下面判断list是否为空时防止硬中断抢占 */
	local_irq_disable();
	/* 将要轮询的设备链表转移到临时链表上 */
	list_splice_init(&sd->poll_list, &list);
	local_irq_enable();
	/* 循环处理poll_list链表上的等待处理的napi */
	for (;;) {
		struct napi_struct *n;
		/* 如果遍历完链表,则停止 */
		if (list_empty(&list)) {
			if (!sd_has_rps_ipi_waiting(sd) && list_empty(&repoll))
				return;
			break;
		}
		/* 获取链表中首个设备 */
		n = list_first_entry(&list, struct napi_struct, poll_list);
		/* 调用驱动初始化时通过 netif_napi_add 注册的回调收包 poll 函数;非NAPI为固定 process_backlog()
		 * 处理完一个设备上的报文则要记录处理数量
		 */
		budget -= napi_poll(n, &repoll);

		/* 如果超出预设时间或者达到处理报文最大个数则停止处理 */
		if (unlikely(budget <= 0 ||
			     time_after_eq(jiffies, time_limit))) {
			sd->time_squeeze++;
			break;
		}
	}

	local_irq_disable();

	list_splice_tail_init(&sd->poll_list, &list);
	list_splice_tail(&repoll, &list);
	list_splice(&list, &sd->poll_list);
	/* 如果softnet_data.poll_list上还有未处理设备,则继续触发软中断 */
	if (!list_empty(&sd->poll_list))
		__raise_softirq_irqoff(NET_RX_SOFTIRQ);

	net_rps_action_and_irq_enable(sd);
}

process_backlog

非NAPI设备的设备轮询函数

/* 
 * __netif_receive_skb-->__netif_receive_skb_core: 逐个处理 napi_struct.input_pkt_queue 上的数据报文
 */
static int process_backlog(struct napi_struct *napi, int quota)
{
	int work = 0;
	/* 取得本地CPU上的 softnet_data  数据 */
	struct softnet_data *sd = container_of(napi, struct softnet_data, backlog);

	/* Check if we have pending ipi, its better to send them now,
	 * not waiting net_rx_action() end.
	 */
	if (sd_has_rps_ipi_waiting(sd)) {
		local_irq_disable();
		net_rps_action_and_irq_enable(sd);
	}

	napi->weight = weight_p;
	local_irq_disable();
	while (1) {
		struct sk_buff *skb;
		/* 从 softnet_data.process_queue 队列中取一个skb */
		while ((skb = __skb_dequeue(&sd->process_queue))) {
			rcu_read_lock();
			local_irq_enable();
			__netif_receive_skb(skb);
			rcu_read_unlock();
			local_irq_disable();
			input_queue_head_incr(sd);
			if (++work >= quota) {
				local_irq_enable();
				return work;
			}
		}

		rps_lock(sd);
		if (skb_queue_empty(&sd->input_pkt_queue)) {
			/*
			 * Inline a custom version of __napi_complete().
			 * only current cpu owns and manipulates this napi,
			 * and NAPI_STATE_SCHED is the only possible flag set
			 * on backlog.
			 * We can use a plain write instead of clear_bit(),
			 * and we dont need an smp_mb() memory barrier.
			 * 如果 input_pkt_queue 为空,则说明所有数据包已经处理完了
			 * 去除 NAPI_STATE_SCHED 标志
			 */
			napi->state = 0;
			rps_unlock(sd);

			break;
		}
		/* 将 input_pkt_queue 中元素转移到 process_queue 上,并初始化 input_pkt_queue */
		skb_queue_splice_tail_init(&sd->input_pkt_queue,
					   &sd->process_queue);
		rps_unlock(sd);
	}
	local_irq_enable();

	return work;
}

__netif_receive_skb_core

/* 协议栈入口函数:
 * 1. 对skb中报文头元数据的调整
 * 2. 如果是VLAN报文,循环把vlan头剥掉,如果是qinq场景,两个vlan都会剥离,即在调用deliver_skb向上层传递报文数据时不含vlan标签
 * 3. ptype_all处理,例如抓包程序、raw socket等;ptype_base处理,交给协议栈处理,例如ip、arp、rarp等
 * 4. 交给rx_handler处理,例如OVS、linux bridge等
 */
static int __netif_receive_skb_core(struct sk_buff *skb, bool pfmemalloc)
{
	/* packet_type作为网络层的输入接口,每一个网络层协议都会有一个对应的结构体
	 * 作为链路层、网络层之间的桥梁
	 */
	struct packet_type *ptype, *pt_prev;
	rx_handler_func_t *rx_handler;
	struct net_device *orig_dev;
	bool deliver_exact = false;
	int ret = NET_RX_DROP;
	__be16 type;

	net_timestamp_check(!netdev_tstamp_prequeue, skb);

	trace_netif_receive_skb(skb);
	/* 记录原始收包网络设备 */
	orig_dev = skb->dev;
	/* 此时data指针指向IP层头;设置 skb.network_header */
	skb_reset_network_header(skb);
	if (!skb_transport_header_was_set(skb))
		skb_reset_transport_header(skb);
	skb_reset_mac_len(skb);/* 设置 mac_len 为以太网报文头部长度 */

	pt_prev = NULL;

another_round:
	/* 设置skb.skb_iif, 记录数据包收包网络设备的索引号 */
	skb->skb_iif = skb->dev->ifindex;
	/* 增加本CPU处理过的数据包个数 */
	__this_cpu_inc(softnet_data.processed);
	/* 如果报文带vlan, 在 eth_type_trans 中设置的 skb.protocal 为 eth_p_8021q, eth_p8021ad */
	if (skb->protocol == cpu_to_be16(ETH_P_8021Q) ||
	    skb->protocol == cpu_to_be16(ETH_P_8021AD)) {
		/* 剥离vlan标签 */
		skb = skb_vlan_untag(skb);
		if (unlikely(!skb))
			goto out;
	}

#ifdef CONFIG_NET_CLS_ACT
	if (skb->tc_verd & TC_NCLS) {
		skb->tc_verd = CLR_TC_NCLS(skb->tc_verd);
		goto ncls;
	}
#endif

	if (pfmemalloc)
		goto skip_taps;
	/* 在net_dev_init中会初始化两种链表;dev_add_pack()将一个协议类型结构链入某一个链表
	 * ptype_all:当协议类型是ETH_P_ALL时,它将被链入 ptype_all 链表;该链表主要用于 sniffer 程序,接收所有NIC收到的包
	 * ptype_base:hash链表,用于各种协议;key为协议类型;例如ETH_P_IP则调用ip_rcv, ETH_P_IPV6则调用ipv6_rcv
	 */
	list_for_each_entry_rcu(ptype, &ptype_all, list) {
		if (pt_prev)
			ret = deliver_skb(skb, pt_prev, orig_dev);
		pt_prev = ptype;
	}
	/* 如果设备上注册了 pytpye_all, 做相应处理 */
	list_for_each_entry_rcu(ptype, &skb->dev->ptype_all, list) {
		if (pt_prev)
			ret = deliver_skb(skb, pt_prev, orig_dev);
		pt_prev = ptype;
	}

skip_taps:
#ifdef CONFIG_NET_CLS_ACT
	if (static_key_false(&ingress_needed)) {
		skb = handle_ing(skb, &pt_prev, &ret, orig_dev);
		if (!skb)
			goto out;
	}

	skb->tc_verd = 0;
ncls:
#endif
	if (pfmemalloc && !skb_pfmemalloc_protocol(skb))
		goto drop;
	/* 判断是否为vlan报文,并且vlan_tci 的VLAN_TAG_PRESENT位为1(skb_vlan_untag中进行过设置) */
	if (skb_vlan_tag_present(skb)) {
		if (pt_prev) {
			ret = deliver_skb(skb, pt_prev, orig_dev);
			pt_prev = NULL;
		}
		if (vlan_do_receive(&skb))
			goto another_round;
		else if (unlikely(!skb))
			goto out;
	}
	/* 如果rx_handler!=NULL,则为网桥设备;在设备初始化时在 netdev_rx_handler_register 注册
	 * 比如网桥设备的 br_handle_frame;bonding接口注册 bond_handle_frame 
	 */
	rx_handler = rcu_dereference(skb->dev->rx_handler);
	if (rx_handler) {
		if (pt_prev) {
		/* 如果pt_prev不为空,表示进行过ETH_P_ALL协议类型处理,执行刚刚链表的最后一个协议处理函数 */
			ret = deliver_skb(skb, pt_prev, orig_dev);
			pt_prev = NULL;
		}
		/* 根据网桥处理的返回值进行下一步处理 */
		switch (rx_handler(&skb)) {
		/* 桥已经处理过该数据包,该数据包会以其它的方式传送 */
		case RX_HANDLER_CONSUMED:
			ret = NET_RX_SUCCESS;
			goto out;
		/* 桥改变的数据包的 skb->dev, 需要another_round 进行再一次处理 */
		case RX_HANDLER_ANOTHER:
			goto another_round;
		/* 数据包只会传送到注册为具体网络设备(ptype->dev == skb->dev)的协议处理例程 */
		case RX_HANDLER_EXACT:
			deliver_exact = true;
		/* 正常传送 */
		case RX_HANDLER_PASS:
			break;
		default:
			BUG();
		}
	}

	if (unlikely(skb_vlan_tag_present(skb))) {
		if (skb_vlan_tag_get_id(skb))
			skb->pkt_type = PACKET_OTHERHOST;
		/* Note: we might in the future use prio bits
		 * and set skb->priority like in vlan_do_receive()
		 * For the time being, just ignore Priority Code Point
		 */
		skb->vlan_tci = 0;
	}
	/* 记录IP层协议类型 */
	type = skb->protocol;

	/* 若未设置精确传送,向未指定设备的协议处理历程传送一份数据 */
	if (likely(!deliver_exact)) {
		deliver_ptype_list_skb(skb, &pt_prev, orig_dev, type,
				       &ptype_base[ntohs(type) &
						   PTYPE_HASH_MASK]);
	}
	/* 交给与原设备的绑定的具体处理协议例程处理 */
	deliver_ptype_list_skb(skb, &pt_prev, orig_dev, type,
			       &orig_dev->ptype_specific);
	/* 如果skb的当前设备与原设备不同(进行过vlan处理或桥处理), 则交给绑定当前设备的具体处理协议函数处理 */
	if (unlikely(skb->dev != orig_dev)) {
		deliver_ptype_list_skb(skb, &pt_prev, orig_dev, type,
				       &skb->dev->ptype_specific);
	}
	/* 如果 pt_prev 不为空,表明上面链表处理过程中还留下最后一个协议处理函数还未执行
	 * 此时将这个协议处理函数传出到外层函数 __netif_receive_skb_one_core 调用 pt_prev->func处理
	 * 外层函数处理时就不需要deliver_skb来增加skb->users,减少了一次skb的释放
	 */
	if (pt_prev) {
		if (unlikely(skb_orphan_frags(skb, GFP_ATOMIC)))
			goto drop;
		else
			ret = pt_prev->func(skb, skb->dev, pt_prev, orig_dev);
	} else {
drop:
		atomic_long_inc(&skb->dev->rx_dropped);
		kfree_skb(skb);
		/* Jamal, now you will not able to escape explaining
		 * me how you were going to use this. :-)
		 */
		ret = NET_RX_DROP;
	}

out:
	return ret;
}

ip_rcv

/*
 * 	Main IP Receive routine.
 */
int ip_rcv(struct sk_buff *skb, struct net_device *dev, struct packet_type *pt, struct net_device *orig_dev)
{
	const struct iphdr *iph;
	u32 len;

	/* PACKET_OTHERHOST: 表示非去往本机但是在特定模式下被接受的报文 */
	if (skb->pkt_type == PACKET_OTHERHOST)
		goto drop;

	/* 内核数据统计 */
	IP_UPD_PO_STATS_BH(dev_net(dev), IPSTATS_MIB_IN, skb->len);
	/* 如果报文是共享的,则复制一个出来,此时复制出的 sk_buff 已经和socket脱离了关系
	 * 如果嗅探器或者其它的用户对数据包需要进行处理;则在调用ip_rcv之前,netif_receive_skb 会增加skb的引用计数,既应用计数>1
	 */
	skb = skb_share_check(skb, GFP_ATOMIC);
	if (!skb) {
		IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_INDISCARDS);
		goto out;
	}
	/* pskb_may_pull 确保 skb->data 指向的内存包含的数据至少为IP头部大小 */
	if (!pskb_may_pull(skb, sizeof(struct iphdr)))
		goto inhdr_error;
	/* pskb_may_pull可能会调整skb中的指针,所以需要重新定义IP头部 */
	iph = ip_hdr(skb);

	/*
	 *	RFC1122: 3.2.1.2 MUST silently discard any IP frame that fails the checksum.
	 *
	 *	Is the datagram acceptable?
	 *
	 *	1.	Length at least the size of an ip header
	 *	2.	Version of 4
	 *	3.	Checksums correctly. [Speed optimisation for later, skip loopback checksums]
	 *	4.	Doesn't have a bogus length
	 */
	/* 检查ip header */
	if (iph->ihl < 5 || iph->version != 4)
		goto inhdr_error;

	BUILD_BUG_ON(IPSTATS_MIB_ECT1PKTS != IPSTATS_MIB_NOECTPKTS + INET_ECN_ECT_1);
	BUILD_BUG_ON(IPSTATS_MIB_ECT0PKTS != IPSTATS_MIB_NOECTPKTS + INET_ECN_ECT_0);
	BUILD_BUG_ON(IPSTATS_MIB_CEPKTS != IPSTATS_MIB_NOECTPKTS + INET_ECN_CE);
	IP_ADD_STATS_BH(dev_net(dev),
			IPSTATS_MIB_NOECTPKTS + (iph->tos & INET_ECN_MASK),
			max_t(unsigned short, 1, skb_shinfo(skb)->gso_segs));
	/* 确保IP完整的头部包括选项在内存中 */
	if (!pskb_may_pull(skb, iph->ihl*4))
		goto inhdr_error;

	iph = ip_hdr(skb);
	/* 校验IP头部的校验和 */
	if (unlikely(ip_fast_csum((u8 *)iph, iph->ihl)))
		goto csum_error;
	/* 获取IP数据包总长度 */
	len = ntohs(iph->tot_len);
	/* 确保skb的数据长度 >= ip数据包总长度 */
	if (skb->len < len) {
		IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_INTRUNCATEDPKTS);
		goto drop;
	} else if (len < (iph->ihl*4))
		goto inhdr_error;

	/* Our transport medium may have padded the buffer out. Now we know it
	 * is IP we can trim to the true length of the frame.
	 * Note this now means skb->len holds ntohs(iph->tot_len).
	 */
	if (pskb_trim_rcsum(skb, len)) {
		IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_INDISCARDS);
		goto drop;
	}

	skb->transport_header = skb->network_header + iph->ihl*4;

	/* Remove any debris in the socket control block */
	memset(IPCB(skb), 0, sizeof(struct inet_skb_parm));

	/* Must drop socket now because of tproxy. */
	skb_orphan(skb);
	/* 忽略与netfilter子系统的交互,调用为ip_rcv_finish(skb) */
	return NF_HOOK(NFPROTO_IPV4, NF_INET_PRE_ROUTING, NULL, skb,
		       dev, NULL,
		       ip_rcv_finish);

csum_error:
	IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_CSUMERRORS);
inhdr_error:
	IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_INHDRERRORS);
drop:
	kfree_skb(skb);
out:
	return NET_RX_DROP;
}

附录

napi_struct

/*
 * Structure for NAPI scheduling similar to tasklet but with weighting
 * 每个NAPI设备都会对应一个结构体,通过 napi_struct.poll_list 链接到CPU的 softnet_data.poll_list 
 * 非NAPI没有对应的 napi_struct 结构;为了使用NAPI的处理流程,所有非NAPI设备共用softnet_data.back_log.poll_list作为一个虚拟设备添加到轮询链表
 */
struct napi_struct {
	/* 用于加入处于轮询状态的设备列表 */
	struct list_head	poll_list;
	/* 设备状态 */
	unsigned long		state;
	int			weight;/* 每次处理的最大数量,非NAPI默认为64 */
	unsigned int		gro_count;
	/* 设备的轮询方法,非NAPI固定为 process_backlog() */
	int			(*poll)(struct napi_struct *, int);
#ifdef CONFIG_NETPOLL
	spinlock_t		poll_lock;
	int			poll_owner;
#endif
	struct net_device	*dev;
	struct sk_buff		*gro_list;
	struct sk_buff		*skb;
	struct hrtimer		timer;
	struct list_head	dev_list;
	struct hlist_node	napi_hash_node;
	unsigned int		napi_id;
};

参考资料

rfs/rps基本原理

linux rps/rfs实现