epoll(2) 源码分析
epoll(2) 源码分析
文本内核代码取自 5.0.18 版本,和上一篇文章中的版本不同是因为另一个电脑出了问题,但是总体差异不大。
引子留下的问题
关键数据结构
提供的系统调用
就绪事件相关逻辑
epoll 间的相互影响及处理
问题的解答
引子留下的问题
在上一篇文章中留下了几个问题,本文将针对这几个问题进行分析:
epoll(2) 得到就绪事件的复杂度为何是 O(1)
epoll(2) 和普通的文件相比的区别在哪里,比如和 eventfd(2) 比较
epoll(2) 相对 poll(2)/select(2) 多提供了 EPOLLET 的触发模式,现象在上面可以看到区别,实现是如何做到的。
epoll(2) 相互关注时,有就绪事件到来会产生相互唤醒的问题,为何会出现这样的问题
对于问题 4,内核是如何解决这种相互唤醒的问题。
解答在文末.
关键的数据结构
在第一次阅读代码时,优先掌握该功能的核心数据结构有利于对于全局的把控。
struct eventpoll
struct eventpoll 对应一个 epoll 实例的结构,包含所有的文件事件,作为 epoll 的接口使用。
/*
* This structure is stored inside the "private_data" member of the file
* structure and represents the main data structure for the eventpoll
* interface.
*/
struct eventpoll {
spinlock_t lock; // 保护整个数据结构
struct mutex mtx; // 保护正在操作的文件
wait_queue_head_t wq; // sys_epoll_wait() 使用的等待队列
wait_queue_head_t poll_wait; // epoll 作为被监视文件时 file->poll() 使用的等待队列,使用较少
// poll_wait 队列作用和 eventfd 文件中的 wqh 队列相同
struct list_head rdllist; // 就绪的文件链表,连接 epitem 上的 rdllink
struct epitem *ovflist; // 也是用来串联就绪的事件,作为 rdlist 的备胎使用
struct rb_root_cached rbr; // 所有关注的文件事件的红黑树,在内核空间维护
/* wakeup_source used when ep_scan_ready_list is running */
struct wakeup_source *ws; // 不分析该功能,只知道为唤醒源就行
struct user_struct *user; // epoll 创建操作所属的用户
struct file *file; // epoll 关联的文件结构
/* used to optimize loop detection check */
int visited;
struct list_head visited_list_link;
#ifdef CONFIG_NET_RX_BUSY_POLL
/* used to track busy poll napi_id */
unsigned int napi_id;
#endif
};
struct epitem
struct epitem 每个文件描述符添加到 eventpoll 接口将产生一个 epitem项 被链接到 eventpoll 中的红黑树上。
/*
* Each file descriptor added to the eventpoll interface will
* have an entry of this type linked to the "rbr" RB tree.
* Avoid increasing the size of this struct, there can be many thousands
* of these on a server and we do not want this to take another cache line.
*/
struct epitem {
union {
/* RB tree node links this structure to the eventpoll RB tree */
struct rb_node rbn;
/* Used to free the struct epitem */
struct rcu_head rcu;
};
struct list_head rdllink; // 用于连接到 eventpoll->rdllist 的链表,和 rdllist 一起使用
struct epitem *next; // 连接到 eventpoll->ovflist 的指针,和 ovflist 一起使用
struct epoll_filefd ffd; // 文件 file 结构 + fd,作为红黑树的节点
int nwait; // 附加在 poll 操作上活跃的等待队列的数量
/* List containing poll wait queues */
struct list_head pwqlist; // 注释是包含轮询等待队列的链表,但是实际上个人更倾向为这个链表只是为了连接 eppoll_entry 结构。
// 和上面那个 nwait 一样,这两个变量的添加操作只会发生一次,就是调用 ep_insert() 的时候,但是 epitem 在一个 epoll 实例中只会调用一次。
struct eventpoll *ep; // 当前 epitem 的所有者
struct list_head fllink; // 连接文件结构的链表
/* wakeup_source used when EPOLLWAKEUP is set */
struct wakeup_source __rcu *ws; // 唤醒源,不考虑
/* The structure that describe the interested events and the source fd */
struct epoll_event event; // 用户传入的 event 结构
};
struct eppoll_entry
struct eppoll_entry 为文件的等待队列项回调和epoll相关联的结构. 类似为poll(2) 中的 poll_table_entry
/* Wait structure used by the poll hooks */
struct eppoll_entry {
struct list_head llink; // 连接至 epitem 中的 pwqlist 链表中
struct epitem *base; // epitem 所属者
wait_queue_entry_t wait; // 等待队列项
wait_queue_head_t *whead; // 等待队列头,关注文件的等待队列,如 eventfd->pwh
};
epoll(2) 相关的系统调用
epoll_create(2)
epoll_ctl(2)
epoll_wait(2)
整个 fs/eventpoll.c 的代码量较多(2000+), 所以这里节选部分主要的代码进行分析, 一些对于参数的合法性的校验就不放出来了.
epoll 的实现做了两种区分: 关注的文件是否为 epoll 类型, 我们先对非epoll文件进行分析, 这个部分代码比较直观易懂, 对epoll文件的处理考虑了多种情况, 留作之后分析.
epoll_create(2)
创建一个新的文件描述符, 对应一个 epoll 实例.
为 eventpoll 结构分配内存并且初始化
获取一个新的文件并且与 eventpoll 结构相关联.
/*
* Open an eventpoll file descriptor.
*/
static int do_epoll_create(int flags)
{
int error, fd;
struct eventpoll *ep = NULL;
struct file *file;
error = ep_alloc(&ep); // 分配内存并初始化, 代码较直观, 不做分析
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
if (fd < 0) {
error = fd;
goto out_free_ep;
}
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
ep->file = file;
fd_install(fd, file);
}
epoll_ctl(2)
epoll_ctl 为epoll的控制函数, 根据函数的 @op 入参分发需要进行的操作.
函数的功能主体比较清晰, 也分为两部分:
对监视文件为epoll经行循环检测
根据操作类型分发具体执行的函数
/*
* The following function implements the controller interface for
* the eventpoll file that enables the insertion/removal/change of
* file descriptors inside the interest set.
*/
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
// 加锁部分为对监视的文件是epoll时候进行的循环检测, 这部分后面分析, 这里只看非 epoll 文件的处理
mutex_lock_nested(&ep->mtx, 0);
if (op == EPOLL_CTL_ADD) {
if (!list_empty(&f.file->f_ep_links) ||
is_file_epoll(tf.file)) {
full_check = 1;
mutex_unlock(&ep->mtx);
mutex_lock(&epmutex);
if (is_file_epoll(tf.file)) {
error = -ELOOP;
if (ep_loop_check(ep, tf.file) != 0) {
clear_tfile_check_list();
goto error_tgt_fput;
}
} else
list_add(&tf.file->f_tfile_llink,
&tfile_check_list);
mutex_lock_nested(&ep->mtx, 0);
if (is_file_epoll(tf.file)) {
tep = tf.file->private_data;
mutex_lock_nested(&tep->mtx, 1);
}
}
}
/*
* Try to lookup the file inside our RB tree, Since we grabbed "mtx"
* above, we can be sure to be able to use the item looked up by
* ep_find() till we release the mutex.
*/
epi = ep_find(ep, tf.file, fd); // 从红黑树中寻找操作的文件
error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) { // 不存在就插入到eventpoll中
epds.events |= EPOLLERR | EPOLLHUP;
error = ep_insert(ep, &epds, tf.file, fd, full_check);
} else
error = -EEXIST;
if (full_check)
clear_tfile_check_list();
break;
case EPOLL_CTL_DEL:
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD:
if (epi) {
if (!(epi->event.events & EPOLLEXCLUSIVE)) {
epds.events |= EPOLLERR | EPOLLHUP;
error = ep_modify(ep, epi, &epds);
}
} else
error = -ENOENT;
break;
}
if (tep != NULL)
mutex_unlock(&tep->mtx);
mutex_unlock(&ep->mtx);
return error;
}
ep_insert()
分配一个 epitem 的内存并初始化, 再将该 epitem 添加到 eventpoll 中的红黑树上.
初始化过程也包含了几个部分:
对 epitem 结构进行初始化, 设置各成员变量的值.
调用目标文件的file->f_op->poll() 函数设置等待队列项回调函数, 这个是实现 epoll_wait(2) 复杂度为 O(1) 最重要的一步, 关注的文件产生就绪事件就会调用该回调函数 ep_ptable_queue_proc
返回就绪事件掩码, 将当前 epitem 添加到 eventpoll->rdllist 中, 唤醒 epoll_wait(2) 线程
/*
* Must be called with "mtx" held.
*/
static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
struct file *tfile, int fd, int full_check)
{
int error, pwake = 0;
__poll_t revents;
struct epitem *epi;
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
return -ENOMEM;
/* Item initialization follow here ... */
/* Initialize the poll table using the queue callback */
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
/*
* Attach the item to the poll hooks and get current event bits.
* We can safely use the file* here because its usage count has
* been increased by the caller of this function. Note that after
* this operation completes, the poll callback can start hitting
* the new item.
*/
revents = ep_item_poll(epi, &epq.pt, 1);
/* Add the current item to the list of active epoll hook for this file */
spin_lock(&tfile->f_lock);
list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links); // 将当前 epitem 添加到监视文件的 f_ep_links 链表上.
spin_unlock(&tfile->f_lock);
/*
* Add the current item to the RB tree. All RB tree operations are
* protected by "mtx", and ep_insert() is called with "mtx" held.
*/
ep_rbtree_insert(ep, epi); // 将当前 epitem 添加到eventpoll的红黑树中
/* If the file is already "ready" we drop it inside the ready list */
if (revents && !ep_is_linked(epi)) { // 产生就绪事件并且当前 epitem 未添加进 eventpoll 中(这个有点儿明显)
list_add_tail(&epi->rdllink, &ep->rdllist); // 添加至 ep->rdllist, 留意这两个链表是一起出现的
/* Notify waiting tasks that events are available */
if (waitqueue_active(&ep->wq)) // wq 队列是 epoll_wait(2) 使用的, 这里唤醒调用 epoll_wait(2) 进入阻塞状态的线程.
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait)) // 这里不直接唤醒是加锁的原因, poll_wait 队列属于被监视文件使用, 不应该在epoll实例中唤醒
pwake++;
}
spin_unlock_irq(&ep->wq.lock);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
}
ep_remove()
作用和 ep_insert() 相反, 释放内存, 删除与其它资源相关联的连接, 在互斥量 eventpoll->mtx 加锁下进行.
/*
* Removes a "struct epitem" from the eventpoll RB tree and deallocates
* all the associated resources. Must be called with "mtx" held.
*/
static int ep_remove(struct eventpoll *ep, struct epitem *epi)
{
struct file *file = epi->ffd.file;
lockdep_assert_irqs_enabled();
/*
* Removes poll wait queue hooks.
*/
ep_unregister_pollwait(ep, epi); // 删除 epitem->pwdlist 关联的等待项链表
/* Remove the current item from the list of epoll hooks */
spin_lock(&file->f_lock);
list_del_rcu(&epi->fllink); // 从监视文件中的 file->f_ep_links 链表中删除当前 epitem
spin_unlock(&file->f_lock);
rb_erase_cached(&epi->rbn, &ep->rbr); // 从 eventpoll 中的红黑树中删除当前 epitem 节点
spin_lock_irq(&ep->wq.lock);
if (ep_is_linked(epi))
list_del_init(&epi->rdllink); // 从 eventpoll 中的就绪队列 rdllist 中删除当前 epitem 节点
spin_unlock_irq(&ep->wq.lock);
/*
* At this point it is safe to free the eventpoll item. Use the union
* field epi->rcu, since we are trying to minimize the size of
* 'struct epitem'. The 'rbn' field is no longer in use. Protected by
* ep->mtx. The rcu read side, reverse_path_check_proc(), does not make
* use of the rbn field.
*/
call_rcu(&epi->rcu, epi_rcu_free); // 释放当前 epitem 的内存
atomic_long_dec(&ep->user->epoll_watches); // eventpoll 所属用户监视的 epitem数量减一
return 0;
}
ep_modify()
调整关注文件的事件.
/*
* Modify the interest event mask by dropping an event if the new mask
* has a match in the current file status. Must be called with "mtx" held.
*/
static int ep_modify(struct eventpoll *ep, struct epitem *epi,
const struct epoll_event *event)
{
int pwake = 0;
poll_table pt;
lockdep_assert_irqs_enabled();
// 设置 file->f_op->poll 的回调函数为NULL, 因为在insert中已经设置了文件等待队列项的回调函数
init_poll_funcptr(&pt, NULL);
/*
* Get current event bits. We can safely use the file* here because
* its usage count has been increased by the caller of this function.
* If the item is "hot" and it is not registered inside the ready
* list, push it inside.
*/
if (ep_item_poll(epi, &pt, 1)) { // 调用f_op->poll() 获取文件的就绪事件
spin_lock_irq(&ep->wq.lock);
if (!ep_is_linked(epi)) { // 未添加至 eventpoll 接口的就绪队列中
list_add_tail(&epi->rdllink, &ep->rdllist); // 添加
ep_pm_stay_awake(epi); // 电源管理的函数, 不看
/* Notify waiting tasks that events are available */
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq); // 唤醒调用 epoll_wait(2) 的线程
if (waitqueue_active(&ep->poll_wait)) // 分析同 ep_insert()
pwake++;
}
spin_unlock_irq(&ep->wq.lock);
}
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return 0;
}
epoll_wait(2)
等待就绪的事件。
ep_events_available 为检查是否存在就绪事件,其实就是检查 rdllist 和 ovflist 是否有被修改过,复杂度为 O(1).
static inline int ep_events_available(struct eventpoll *ep)
{
return !list_empty_careful(&ep->rdllist) ||
R