IO多路复用
1.IO多路复用的概念
- 单线程或单进程同时监测若干个文件描述符是否可以执行IO操作的能力。
2.为什么出现IO多路复用
-
服务器需要维护N多个与客户端之间的socketfd;并且在receive之前需要知道数据知否出现---》组件IO多路复用技术出现---》解决检测服务器端N多个fd的状态
- Tcp是有连接的,Udp是无连接---》上述情况出现在Tcp连接情况
-
IO多路复用的三种方案:select/poll/epoll
select(fds+1, rds, wds, timeout)
poll(fds, nfd, timeout)
- epoll
-
epoll_create(size/flags)
--》创建根节点---》epoll实例 -
epoll_ctl(fd, op, events)
---》挂载fd节点 -
epoll_wait(fd, events, maxevents, timeout)
---》检测
-
-
Tcp建立连接---》需要的fds放入IO多路复用中管理---》检测fd缓存状态---》receive
receive(fd,buff,sizeof(buff),0)
3.IO多路复用区别特点
select | poll | epoll | |
---|---|---|---|
性能 | 随IO处理数增加,性能逐渐下降;并且连接数有限制 | 随IO处理数增加,性能逐渐下降 | 随IO处理数增加,性能基本上不会下降 |
连接数 | 最大连接数1024;处理1024以上则需要修改宏FD_SETSIZE,重新编译 | 无限制 | 无限制 |
数据结构 | 不确定,有数组的可能,但是为线性结构 | 不确定,有链表的可能,但是为线性结构 | 红黑树,队列 |
处理方式 | 线性轮询 | 线性轮询 | 回调callback |
内存拷贝 | 所有fds从用户空间和内核空间来回拷贝** | 所有fds从用户空间和内核空间来回拷贝 | epoll_wait()只需要从就绪队列上拷贝由内核空间到用户空间 |
使用复杂度 | 低 | 低 | 中 |
时间复杂度 | O(n) | O(n) | O() |
- select/poll
- 拷贝fds--》从用户空间至内核
- for(;;)进行遍历判断
- select:进行分类rds,wds,eds
- poll:pollfd维护自身状态
- 从内核拷贝至用户空间
- epoll
- epoll_create--->创建一个根节点
- epoll_ctl---》挂载节点
- epoll_wait---》从内核到用户只拷贝满足条件的节点,从就绪队列中拷贝
- 在哪些场景下 select比epoll更合适?
- IO数量不多,且使用多线程多进程的情况使用select比epoll更合适
- epoll使用红黑树,必须需要加锁,消耗的资源更多
- IO数量不多,且使用多线程多进程的情况使用select比epoll更合适
红黑树:
等待补充链接
3.IO多路复用源码
-
select:kern_select---》core_sys_select---》do_select---linux-6.0.2\fs\select.c
-
static int kern_select(int n, fd_set __user *inp, fd_set __user *outp,fd_set __user *exp, struct __kernel_old_timeval __user *tvp){ struct timespec64 end_time, *to = NULL; struct __kernel_old_timeval tv; int ret; if (tvp) { //copy_from_user从用户空间拷贝fds if (copy_from_user(&tv, tvp, sizeof(tv))) return -EFAULT; to = &end_time; if (poll_select_set_timeout(to, tv.tv_sec + (tv.tv_usec / USEC_PER_SEC), (tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC)) return -EINVAL; } //调用select--》core_sys_select ret = core_sys_select(n, inp, outp, exp, to); return poll_select_finish(&end_time, tvp, PT_TIMEVAL, ret); }
-
int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, struct timespec64 *end_time) { ...... //加读锁,读取文件数量 rcu_read_lock(); fdt = files_fdtable(current->files); max_fds = fdt->max_fds; rcu_read_unlock(); ...... //select底层处理---》do_select ret = do_select(n, &fds, end_time); if (ret < 0) goto out; if (!ret) { ret = -ERESTARTNOHAND; if (signal_pending(current)) goto out; ret = 0; } ...... out_nofds: return ret; }
-
static int do_select(int n, fd_set_bits *fds, struct timespec64 *end_time) { ..... //读锁,读取文件数量 rcu_read_lock(); retval = max_select_fd(n, fds); rcu_read_unlock(); ..... //通过循环遍历,主要流程流程 for (;;) { unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp; bool can_busy_loop = false; inp = fds->in; outp = fds->out; exp = fds->ex; rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex; for (i = 0; i < n; ++rinp, ++routp, ++rexp) { unsigned long in, out, ex, all_bits, bit = 1, j; unsigned long res_in = 0, res_out = 0, res_ex = 0; __poll_t mask; in = *inp++; out = *outp++; ex = *exp++; all_bits = in | out | ex; if (all_bits == 0) { i += BITS_PER_LONG; continue; } for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) { struct fd f; if (i >= n) break; if (!(bit & all_bits)) continue; mask = EPOLLNVAL; f = fdget(i); if (f.file) { wait_key_set(wait, in, out, bit, busy_flag); mask = vfs_poll(f.file, wait); fdput(f); } if ((mask & POLLIN_SET) && (in & bit)) { res_in |= bit; retval++; wait->_qproc = NULL; } if ((mask & POLLOUT_SET) && (out & bit)) { res_out |= bit; retval++; wait->_qproc = NULL; } if ((mask & POLLEX_SET) && (ex & bit)) { res_ex |= bit; retval++; wait->_qproc = NULL; } if (retval) { can_busy_loop = false; busy_flag = 0; } else if (busy_flag & mask) can_busy_loop = true; } if (res_in) *rinp = res_in; if (res_out) *routp = res_out; if (res_ex) *rexp = res_ex; cond_resched(); } wait->_qproc = NULL; if (retval || timed_out || signal_pending(current)) break; if (table.error) { retval = table.error; break; } if (can_busy_loop && !need_resched()) { if (!busy_start) { busy_start = busy_loop_current_time(); continue; } if (!busy_loop_timeout(busy_start)) continue; } busy_flag = 0; if (end_time && !to) { expire = timespec64_to_ktime(*end_time); to = &expire; } if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE, to, slack)) timed_out = 1; } poll_freewait(&table); return retval; }
-
-
poll:do_sys_poll---》do_poll---linux-6.0.2\fs\select.c
-
static int do_sys_poll(struct pollfd __user *ufds, unsigned int nfds,struct timespec64 *end_time) { ...... len = min_t(unsigned int, nfds, N_STACK_PPS); for (;;) { ..... //copy_from_user从用户空间拷贝fds if (copy_from_user(walk->entries, ufds + nfds-todo, sizeof(struct pollfd) * walk->len)) goto out_fds; ..... } ..... out_fds: //没有拷贝成功 ..... return err; ..... }
-
static int do_poll(struct poll_list *list, struct poll_wqueues *wait,struct timespec64 *end_time) { ...... //poll底层处理流程 for (;;) { struct poll_list *walk; bool can_busy_loop = false; //这个写法,似乎是链表 for (walk = list; walk != NULL; walk = walk->next) { struct pollfd * pfd, * pfd_end; pfd = walk->entries; pfd_end = pfd + walk->len; for (; pfd != pfd_end; pfd++) { if (do_pollfd(pfd, pt, &can_busy_loop, busy_flag)) { count++; pt->_qproc = NULL; /* found something, stop busy polling */ busy_flag = 0; can_busy_loop = false; } } } pt->_qproc = NULL; if (!count) { count = wait->error; if (signal_pending(current)) count = -ERESTARTNOHAND; } if (count || timed_out) break; if (can_busy_loop && !need_resched()) { if (!busy_start) { busy_start = busy_loop_current_time(); continue; } if (!busy_loop_timeout(busy_start)) continue; } busy_flag = 0; if (end_time && !to) { expire = timespec64_to_ktime(*end_time); to = &expire; } if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack)) timed_out = 1; } return count; }
-
-
epoll:do_epoll_wait---》ep_poll--linux-6.0.2\fs\eventpoll.c
-
static int do_epoll_wait(int epfd, struct epoll_event __user *events,int maxevents, struct timespec64 *to) { ..... //调用ep_poll error = ep_poll(ep, events, maxevents, to); error_fput: fdput(f); return error; }
-
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,int maxevents, struct timespec64 *timeout) { int res, eavail, timed_out = 0; u64 slack = 0; wait_queue_entry_t wait; ktime_t expires, *to = NULL; lockdep_assert_irqs_enabled(); if (timeout && (timeout->tv_sec | timeout->tv_nsec)) { slack = select_estimate_accuracy(timeout); to = &expires; *to = timespec64_to_ktime(*timeout); } else if (timeout) { timed_out = 1; } eavail = ep_events_available(ep); while (1) { if (eavail) { res = ep_send_events(ep, events, maxevents); if (res) return res; } if (timed_out) return 0; eavail = ep_busy_loop(ep, timed_out); if (eavail) continue; if (signal_pending(current)) return -EINTR; init_wait(&wait); wait.func = ep_autoremove_wake_function; write_lock_irq(&ep->lock); __set_current_state(TASK_INTERRUPTIBLE); eavail = ep_events_available(ep); if (!eavail) __add_wait_queue_exclusive(&ep->wq, &wait); write_unlock_irq(&ep->lock); if (!eavail) timed_out = !schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS); __set_current_state(TASK_RUNNING); eavail = 1; if (!list_empty_careful(&wait.entry)) { write_lock_irq(&ep->lock); if (timed_out) eavail = list_empty(&wait.entry); __remove_wait_queue(&ep->wq, &wait); write_unlock_irq(&ep->lock); } } }
-
4.参考资料
一文搞懂select、poll和epoll区别 - 知乎 (zhihu.com)
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:IO多路复用 - Python技术站