IO多路复用

1.IO多路复用的概念

  • 单线程或单进程同时监测若干个文件描述符是否可以执行IO操作的能力。

2.为什么出现IO多路复用

IO多路复用

  • 服务器需要维护N多个与客户端之间的socketfd;并且在receive之前需要知道数据知否出现---》组件IO多路复用技术出现---》解决检测服务器端N多个fd的状态

    • Tcp是有连接的,Udp是无连接---》上述情况出现在Tcp连接情况
  • IO多路复用的三种方案:select/poll/epoll

    • select(fds+1, rds, wds, timeout)
    • poll(fds, nfd, timeout)
    • epoll
      • epoll_create(size/flags)--》创建根节点---》epoll实例
      • epoll_ctl(fd, op, events)---》挂载fd节点
      • epoll_wait(fd, events, maxevents, timeout)---》检测
  • Tcp建立连接---》需要的fds放入IO多路复用中管理---》检测fd缓存状态---》receive

    • receive(fd,buff,sizeof(buff),0)

3.IO多路复用区别特点

select poll epoll
性能 随IO处理数增加,性能逐渐下降;并且连接数有限制 随IO处理数增加,性能逐渐下降 随IO处理数增加,性能基本上不会下降
连接数 最大连接数1024;处理1024以上则需要修改宏FD_SETSIZE,重新编译 无限制 无限制
数据结构 不确定,有数组的可能,但是为线性结构 不确定,有链表的可能,但是为线性结构 红黑树,队列
处理方式 线性轮询 线性轮询 回调callback
内存拷贝 所有fds从用户空间和内核空间来回拷贝** 所有fds从用户空间和内核空间来回拷贝 epoll_wait()只需要从就绪队列上拷贝由内核空间到用户空间
使用复杂度
时间复杂度 O(n) O(n) O()
  • select/poll
    • 拷贝fds--》从用户空间至内核
    • for(;;)进行遍历判断
      • select:进行分类rds,wds,eds
      • poll:pollfd维护自身状态
    • 从内核拷贝至用户空间
  • epoll
    • epoll_create--->创建一个根节点
    • epoll_ctl---》挂载节点
    • epoll_wait---》从内核到用户只拷贝满足条件的节点,从就绪队列中拷贝
  • 在哪些场景下 select比epoll更合适?
    • IO数量不多,且使用多线程多进程的情况使用select比epoll更合适
      • epoll使用红黑树,必须需要加锁,消耗的资源更多

红黑树:

​ 等待补充链接

3.IO多路复用源码

  • select:kern_select---》core_sys_select---》do_select---linux-6.0.2\fs\select.c

    • static int kern_select(int n, fd_set __user *inp, fd_set __user *outp,fd_set __user *exp, struct __kernel_old_timeval __user *tvp){
      	struct timespec64 end_time, *to = NULL;
      	struct __kernel_old_timeval tv;
      	int ret;
      	if (tvp) {
              //copy_from_user从用户空间拷贝fds
      		if (copy_from_user(&tv, tvp, sizeof(tv)))
      			return -EFAULT;
      		to = &end_time;
      		if (poll_select_set_timeout(to,
      				tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),
      				(tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))
      			return -EINVAL;
      	}
          //调用select--》core_sys_select
      	ret = core_sys_select(n, inp, outp, exp, to);
      	return poll_select_finish(&end_time, tvp, PT_TIMEVAL, ret);
      }
      
    • int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
      			   fd_set __user *exp, struct timespec64 *end_time)
      {
      	......
          //加读锁,读取文件数量
          rcu_read_lock();
      	fdt = files_fdtable(current->files);
      	max_fds = fdt->max_fds;
      	rcu_read_unlock();
          ......
      	//select底层处理---》do_select
      	ret = do_select(n, &fds, end_time);
      
      	if (ret < 0)
      		goto out;
      	if (!ret) {
      		ret = -ERESTARTNOHAND;
      		if (signal_pending(current))
      			goto out;
      		ret = 0;
      	}
      	......
      out_nofds:
      	return ret;
      }
      
    • static int do_select(int n, fd_set_bits *fds, struct timespec64 *end_time)
      {
          .....
          //读锁,读取文件数量
      	rcu_read_lock();
      	retval = max_select_fd(n, fds);
      	rcu_read_unlock();
      	.....
          //通过循环遍历,主要流程流程
      	for (;;) {
      		unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
      		bool can_busy_loop = false;
      
      		inp = fds->in; outp = fds->out; exp = fds->ex;
      		rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
      
      		for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
      			unsigned long in, out, ex, all_bits, bit = 1, j;
      			unsigned long res_in = 0, res_out = 0, res_ex = 0;
      			__poll_t mask;
      
      			in = *inp++; out = *outp++; ex = *exp++;
      			all_bits = in | out | ex;
      			if (all_bits == 0) {
      				i += BITS_PER_LONG;
      				continue;
      			}
      
      			for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) {
      				struct fd f;
      				if (i >= n)
      					break;
      				if (!(bit & all_bits))
      					continue;
      				mask = EPOLLNVAL;
      				f = fdget(i);
      				if (f.file) {
      					wait_key_set(wait, in, out, bit,
      						     busy_flag);
      					mask = vfs_poll(f.file, wait);
      
      					fdput(f);
      				}
      				if ((mask & POLLIN_SET) && (in & bit)) {
      					res_in |= bit;
      					retval++;
      					wait->_qproc = NULL;
      				}
      				if ((mask & POLLOUT_SET) && (out & bit)) {
      					res_out |= bit;
      					retval++;
      					wait->_qproc = NULL;
      				}
      				if ((mask & POLLEX_SET) && (ex & bit)) {
      					res_ex |= bit;
      					retval++;
      					wait->_qproc = NULL;
      				}
      				if (retval) {
      					can_busy_loop = false;
      					busy_flag = 0;
      				} else if (busy_flag & mask)
      					can_busy_loop = true;
      			}
      			if (res_in)
      				*rinp = res_in;
      			if (res_out)
      				*routp = res_out;
      			if (res_ex)
      				*rexp = res_ex;
      			cond_resched();
      		}
      		wait->_qproc = NULL;
      		if (retval || timed_out || signal_pending(current))
      			break;
      		if (table.error) {
      			retval = table.error;
      			break;
      		}
      		if (can_busy_loop && !need_resched()) {
      			if (!busy_start) {
      				busy_start = busy_loop_current_time();
      				continue;
      			}
      			if (!busy_loop_timeout(busy_start))
      				continue;
      		}
      		busy_flag = 0;
      		if (end_time && !to) {
      			expire = timespec64_to_ktime(*end_time);
      			to = &expire;
      		}
      
      		if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
      					   to, slack))
      			timed_out = 1;
      	}
      
      	poll_freewait(&table);
      
      	return retval;
      }
      
  • poll:do_sys_poll---》do_poll---linux-6.0.2\fs\select.c

    • static int do_sys_poll(struct pollfd __user *ufds, unsigned int nfds,struct timespec64 *end_time)
      {
          ......
      	len = min_t(unsigned int, nfds, N_STACK_PPS);
      	for (;;) {
      	.....
              //copy_from_user从用户空间拷贝fds
      		if (copy_from_user(walk->entries, ufds + nfds-todo,
      					sizeof(struct pollfd) * walk->len))
      			goto out_fds;
      	.....
      	}
      	.....
      out_fds:
      	//没有拷贝成功
          .....
      	return err;
      	.....
      }
      
    • static int do_poll(struct poll_list *list, struct poll_wqueues *wait,struct timespec64 *end_time)
      {
      	......
          //poll底层处理流程
      	for (;;) {
      		struct poll_list *walk;
      		bool can_busy_loop = false;
      		//这个写法,似乎是链表
      		for (walk = list; walk != NULL; walk = walk->next) {
      			struct pollfd * pfd, * pfd_end;
      
      			pfd = walk->entries;
      			pfd_end = pfd + walk->len;
      			for (; pfd != pfd_end; pfd++) {
      				if (do_pollfd(pfd, pt, &can_busy_loop,
      					      busy_flag)) {
      					count++;
      					pt->_qproc = NULL;
      					/* found something, stop busy polling */
      					busy_flag = 0;
      					can_busy_loop = false;
      				}
      			}
      		}
      		pt->_qproc = NULL;
      		if (!count) {
      			count = wait->error;
      			if (signal_pending(current))
      				count = -ERESTARTNOHAND;
      		}
      		if (count || timed_out)
      			break;
      		if (can_busy_loop && !need_resched()) {
      			if (!busy_start) {
      				busy_start = busy_loop_current_time();
      				continue;
      			}
      			if (!busy_loop_timeout(busy_start))
      				continue;
      		}
      		busy_flag = 0;
      
      		if (end_time && !to) {
      			expire = timespec64_to_ktime(*end_time);
      			to = &expire;
      		}
      
      		if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack))
      			timed_out = 1;
      	}
      	return count;
      }
      
  • epoll:do_epoll_wait---》ep_poll--linux-6.0.2\fs\eventpoll.c

    • static int do_epoll_wait(int epfd, struct epoll_event __user *events,int maxevents, struct timespec64 *to)
      {
          .....
          //调用ep_poll
      	error = ep_poll(ep, events, maxevents, to);
      
      error_fput:
      	fdput(f);
      	return error;
      }
      
    • static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,int maxevents, struct timespec64 *timeout)
      {
      	int res, eavail, timed_out = 0;
      	u64 slack = 0;
      	wait_queue_entry_t wait;
      	ktime_t expires, *to = NULL;
      
      	lockdep_assert_irqs_enabled();
      
      	if (timeout && (timeout->tv_sec | timeout->tv_nsec)) {
      		slack = select_estimate_accuracy(timeout);
      		to = &expires;
      		*to = timespec64_to_ktime(*timeout);
      	} else if (timeout) {
      		timed_out = 1;
      	}
      	eavail = ep_events_available(ep);
      
      	while (1) {
      		if (eavail) {
      			res = ep_send_events(ep, events, maxevents);
      			if (res)
      				return res;
      		}
      
      		if (timed_out)
      			return 0;
      
      		eavail = ep_busy_loop(ep, timed_out);
      		if (eavail)
      			continue;
      
      		if (signal_pending(current))
      			return -EINTR;
      		init_wait(&wait);
      		wait.func = ep_autoremove_wake_function;
      
      		write_lock_irq(&ep->lock);
      		__set_current_state(TASK_INTERRUPTIBLE);
      		eavail = ep_events_available(ep);
      		if (!eavail)
      			__add_wait_queue_exclusive(&ep->wq, &wait);
      
      		write_unlock_irq(&ep->lock);
      
      		if (!eavail)
      			timed_out = !schedule_hrtimeout_range(to, slack,
      							      HRTIMER_MODE_ABS);
      		__set_current_state(TASK_RUNNING);
      		eavail = 1;
      
      		if (!list_empty_careful(&wait.entry)) {
      			write_lock_irq(&ep->lock);
      			if (timed_out)
      				eavail = list_empty(&wait.entry);
      			__remove_wait_queue(&ep->wq, &wait);
      			write_unlock_irq(&ep->lock);
      		}
      	}
      }
      

4.参考资料

一文搞懂select、poll和epoll区别 - 知乎 (zhihu.com)

深入浅出理解select、poll、epoll的实现 - 知乎 (zhihu.com)

Linux内核源代码查看及分析方法 - 知乎 (zhihu.com)