libuv 事件循环
解读
事件循环是 libuv 功能的核心部分。它的主要职责是对 I/O 进行轮询然后基于不同的事件源执行它们的回调函数。
事件循环是 libuv 功能的核心部分。它的主要职责是对 I/O 进行轮询然后基于不同的事件源执行它们的回调函数。
在上一篇文章我们也讲解了它的整个运作框架,虽然代码还未讲解,但是还是要看看怎么去写代码的。
在 libuv 中,事件循环的声明是这样子的:
typedef struct uv_loop_s uv_loop_t;
它是一个句柄handle类型,它管理了同一事件循环的所有资源,并且在整个事件循环的生命周期内都是可用的。
其实到后面你就会发现,实际上它是事件循环所有资源的统一入口,所有在事件循环上运行的各类 Handle/Request 实例都被注册到 uv_loop_t 内部定义的结构中,反正知道它是可以管理事件循环的所有资源就行了。
这里再补充一个知识点的说明吧:
- IO 观察者(io_watcher):在 libuv 内部,对所有 I/O 操作进行了统一的抽象,在底层的操作系统 I/O 操作基础上,结合事件循环机制,实现了 IO 观察者,对应结构体 uv__io_s,通过它可以知道 I/O 相关的信息,比如可读、可写等,handle 通过内嵌 IO 观察者的方式获得 IO 的监测能力。
其实大家可以直接理解为,我要监测一个 TCP 连接,那么 TCP handle 就算是一个 IO 观察者,其实它是一个句柄的同时又是一个 IO 观察者。
可以看它的成员变量有非常多的东西(此处是对 linux 平台的讲解,Windows 的不在讨论范围内,不过都差不多):
struct uv_loop_s {
void* data;
unsigned int active_handles;
void* handle_queue[2];
union {
void* unused[2];
unsigned int count;
} active_reqs;
unsigned int stop_flag;
UV_LOOP_PRIVATE_FIELDS
};
#define UV_LOOP_PRIVATE_FIELDS \
unsigned long flags; \
int backend_fd; \
void* pending_queue[2]; \
void* watcher_queue[2]; \
uv__io_t** watchers; \
unsigned int nwatchers; \
unsigned int nfds; \
void* wq[2]; \
uv_mutex_t wq_mutex; \
uv_async_t wq_async; \
uv_rwlock_t cloexec_lock; \
uv_handle_t* closing_handles; \
void* process_handles[2]; \
void* prepare_handles[2]; \
void* check_handles[2]; \
void* idle_handles[2]; \
void* async_handles[2]; \
void (*async_unused)(void); \
uv__io_t async_io_watcher; \
int async_wfd; \
struct { \
void* min; \
unsigned int nelts; \
} timer_heap; \
uint64_t timer_counter; \
uint64_t time; \
int signal_pipefd[2]; \
uv__io_t signal_io_watcher; \
uv_signal_t child_watcher; \
int emfile_fd; \
UV_PLATFORM_LOOP_FIELDS \
这个 UV_LOOP_PRIVATE_FIELDS 宏跟平台相关,在这里不做过多介绍,就简单说几点:
watcher_queue是uv__io_t的观察者队列,其中保存的是uv__io_t的结构体void* wq[2];表述的是 work queue,是工作队列;timer_heap是timer的二叉堆,它还使用了二叉树来提高遍历的效率。
示例解读
写个 demo 来讲解整个循环事件的过程吧:
#include <stdio.h>
#include <stdlib.h>
#include <uv.h>
int main()
{
uv_loop_t *loop = malloc(sizeof(uv_loop_t));
uv_loop_init(loop);
uv_run(loop, UV_RUN_DEFAULT);
printf("quit...\n");
uv_loop_close(loop);
free(loop);
return 0;
}
这个 demo 是非常简单的,一般来说一个句柄都会经历 初始化、运行、停止、关闭 等过程。
这个函数就是将uv_loop_t初始化,给这个 loop 对象初始化一些默认的成员变量,比如初始化定时器、工作队列、观察者队列等。
int uv_loop_init(uv_loop_t* loop) {
void* saved_data;
int err;
saved_data = loop->data;
memset(loop, 0, sizeof(*loop));
loop->data = saved_data;
heap_init((struct heap*) &loop->timer_heap);
QUEUE_INIT(&loop->wq);
QUEUE_INIT(&loop->idle_handles);
QUEUE_INIT(&loop->async_handles);
QUEUE_INIT(&loop->check_handles);
QUEUE_INIT(&loop->prepare_handles);
QUEUE_INIT(&loop->handle_queue);
loop->active_handles = 0;
loop->active_reqs.count = 0;
loop->nfds = 0;
loop->watchers = NULL;
loop->nwatchers = 0;
QUEUE_INIT(&loop->pending_queue);
QUEUE_INIT(&loop->watcher_queue);
loop->closing_handles = NULL;
uv__update_time(loop);
loop->async_io_watcher.fd = -1;
loop->async_wfd = -1;
loop->signal_pipefd[0] = -1;
loop->signal_pipefd[1] = -1;
loop->backend_fd = -1;
loop->emfile_fd = -1;
loop->timer_counter = 0;
loop->stop_flag = 0;
err = uv__platform_loop_init(loop);
if (err)
return err;
uv__signal_global_once_init();
err = uv_signal_init(loop, &loop->child_watcher);
if (err)
goto fail_signal_init;
uv__handle_unref(&loop->child_watcher);
loop->child_watcher.flags |= UV_HANDLE_INTERNAL;
QUEUE_INIT(&loop->process_handles);
err = uv_rwlock_init(&loop->cloexec_lock);
if (err)
goto fail_rwlock_init;
err = uv_mutex_init(&loop->wq_mutex);
if (err)
goto fail_mutex_init;
err = uv_async_init(loop, &loop->wq_async, uv__work_done);
if (err)
goto fail_async_init;
uv__handle_unref(&loop->wq_async);
loop->wq_async.flags |= UV_HANDLE_INTERNAL;
return 0;
fail_async_init:
uv_mutex_destroy(&loop->wq_mutex);
fail_mutex_init:
uv_rwlock_destroy(&loop->cloexec_lock);
fail_rwlock_init:
uv__signal_loop_cleanup(loop);
fail_signal_init:
uv__platform_loop_delete(loop);
return err;
}
其实在 libuv 有一个全局的、静态的uv_loop_t实例default_loop_struct,与他对应的指针default_loop_ptr,这个东西在后续的使用是经常会被用到,它在uv_default_loop()函数第一次被调用的时候就会通过uv_loop_init()函数进行初始化操作,这就保证了无论使用什么样的handle,它都有一个统一的事件循环入口。
static uv_loop_t default_loop_struct;
static uv_loop_t* default_loop_ptr;
uv_loop_t* uv_default_loop(void) {
if (default_loop_ptr != NULL)
return default_loop_ptr;
if (uv_loop_init(&default_loop_struct))
return NULL;
default_loop_ptr = &default_loop_struct;
return default_loop_ptr;
}
首先讲解一下他的 API 吧,传入uv_loop_t事假循环的句柄,还有一个运行的模式,它的模式有 3 种,分别为默认模式、单次模式、不等待模式。
-
默认模式 UV_RUN_DEFAULT:运行事件循环,直到不再有活动的和引用的句柄或请求为止。
-
单次模式 UV_RUN_ONCE:轮询一次 I/O,如果没有待处理的回调,则进入阻塞状态,完成处理后返回零,不继续运行事件循环。
-
不等待模式 UV_RUN_NOWAIT:对 I/O 进行一次轮询,但如果没有待处理的回调,则不会阻塞。
注意,这个函数不是线程安全的。
typedef enum {
UV_RUN_DEFAULT = 0,
UV_RUN_ONCE,
UV_RUN_NOWAIT
} uv_run_mode;
从代码看uv_run()函数做了什么事情,可以参考上一篇文章中的图,并结合源码来学习:

int uv_run(uv_loop_t* loop, uv_run_mode mode) {
int timeout;
int r;
int ran_pending;
r = uv__loop_alive(loop);
if (!r)
uv__update_time(loop);
while (r != 0 && loop->stop_flag == 0) {
uv__update_time(loop);
uv__run_timers(loop);
ran_pending = uv__run_pending(loop);
uv__run_idle(loop);
uv__run_prepare(loop);
timeout = 0;
if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
timeout = uv_backend_timeout(loop);
uv__io_poll(loop, timeout);
uv__run_check(loop);
uv__run_closing_handles(loop);
if (mode == UV_RUN_ONCE) {
uv__update_time(loop);
uv__run_timers(loop);
}
r = uv__loop_alive(loop);
if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
break;
}
if (loop->stop_flag != 0)
loop->stop_flag = 0;
return r;
}
释放所有内部循环资源。仅当循环完成执行并且所有打开的句柄和请求已关闭时才调用此函数,否则它将返回UV_EBUSY。此函数返回后,用户可以释放为循环分配的内存。
注意,这个函数也不是线程安全的。
int uv_loop_close(uv_loop_t* loop) {
QUEUE* q;
uv_handle_t* h;
#ifndef NDEBUG
void* saved_data;
#endif
if (uv__has_active_reqs(loop))
return UV_EBUSY;
QUEUE_FOREACH(q, &loop->handle_queue) {
h = QUEUE_DATA(q, uv_handle_t, handle_queue);
if (!(h->flags & UV_HANDLE_INTERNAL))
return UV_EBUSY;
}
uv__loop_close(loop);
#ifndef NDEBUG
saved_data = loop->data;
memset(loop, -1, sizeof(*loop));
loop->data = saved_data;
#endif
if (loop == default_loop_ptr)
default_loop_ptr = NULL;
return 0;
}