libuv 句柄解读-异步(async)
解读
async handle 可译为异步句柄,它主要是用于提供异步唤醒的功能,比如在用户线程中唤醒主事件循环线程,并且触发对应的回调函数。
从事件循环线程的处理过程可知,它在 io 循环时会进入阻塞状态,而阻塞的具体时间则通过计算得到,那么在某些情况下,我们想要唤醒事件循环线程,就可以通过 ansyc 去操作,比如当线程池的线程处理完事件后,执行的结果是需要交给事件循环线程的,这时就需要用到唤醒事件循环线程,当然方法也是很简单,调用一下 uv_async_send() 函数通知事件循环线程即可。libuv 线程池中的线程就是利用这个机制和主事件循环线程通讯。
通过uv_async_t可以定义一个async handle的实例。
typedef struct uv_async_s uv_async_t;
uv_async_t是属于handle的子类,并且还包含了一个UV_ASYNC_PRIVATE_FIELDS数据类型,里面包括它的回调函数 async_cb、队列、以及一个 pending 成员变量。
结构比较简单,async_cb保存回调函数指针,queue 作为队列节点插入 loop->async_handles,pending 字段的作用主要是用于保护操作,在唤醒的时候使用,初始化为 0, 在调用唤醒函数的时候会被设置为 1,为什么要这样子做呢,因为 async handle 是异步句柄,这可能不止一个线程会尝试唤醒事件循环,这一个 async handle 不能同时被多个线程操作,因此需要进行原子保护,当它为 1 的时候表示有其他线程在操作这个 async handle。
struct uv_async_s {
UV_HANDLE_FIELDS
UV_ASYNC_PRIVATE_FIELDS
};
#define UV_ASYNC_PRIVATE_FIELDS \
uv_async_cb async_cb; \
void* queue[2]; \
int pending; \
uv_async_cb 回调函数:
typedef void (*uv_async_cb)(uv_async_t* handle);
async handle 相关的 API 非常简单,就一个初始化 uv_async_init(),还有一个异步通知的函数 uv_async_send()
uv_async_init()
函数原型:
UV_EXTERN int uv_async_init(uv_loop_t*,
uv_async_t* async,
uv_async_cb async_cb);
参数:
- uv_loop_t:传入了事件循环的句柄。
- async:指定初始化的 async handle。
- async_cb:指定 async handle 的回调函数。
uv_async_init() 初始化函数不同于其他 handle 的初始化函数,因为它会立即将 async handle 设置为活跃状态,所以 async handle 没有 start 相关的函数。
其实深入看 uv__async_start() 源码你就会发现,它实 际上也是通过 pipe 管道进行唤醒的,因为主线程的 io 循环其实是在观察是否有可读写的 io,libuv 将管道等都抽象为 io 观察者了,在 io 循环中观察管道的读取端,当有数据到来则唤醒,越是深入了解 libuv,你就会发现其实就是各个线程、进程间的通信,一种异步的通信,只不过 libuv 处理的很好,抽象了很多数据结构,并且衍生出了很多子类。
int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
int err;
err = uv__async_start(loop);
if (err)
return err;
uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
handle->async_cb = async_cb;
handle->pending = 0;
QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
uv__handle_start(handle);
return 0;
}
static int uv__async_start(uv_loop_t* loop) {
int pipefd[2];
int err;
if (loop->async_io_watcher.fd != -1)
return 0;
#ifdef __linux__
err = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (err < 0)
return UV__ERR(errno);
pipefd[0] = err;
pipefd[1] = -1;
#else
err = uv__make_pipe(pipefd, UV__F_NONBLOCK);
if (err < 0)
return err;
#endif
uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
uv__io_start(loop, &loop->async_io_watcher, POLLIN);
loop->async_wfd = pipefd[1];
return 0;
}
uv_async_send()
函数原型:
int uv_async_send(uv_async_t* handle)
参数:
- handle:要唤醒指定的 async handle。
uv_async_send() 函数发送消息唤醒事件循环线程并触发回调函数调用,其实我们不难想象出来,它的唤醒就是将消息写入管道中,让 io 观察者发现管道有数据从而唤醒事件循环线程,并随之处理这个 async handle。
其实真正的唤醒操作是uv__async_send()函数,它往管道中写入消息就行了。
int uv_async_send(uv_async_t* handle) {
if (ACCESS_ONCE(int, handle->pending) != 0)
return 0;
if (cmpxchgi(&handle->pending, 0, 1) != 0)
return 0;
uv__async_send(handle->loop);
if (cmpxchgi(&handle->pending, 1, 2) != 1)
abort();
return 0;
}
static void uv__async_send(uv_loop_t* loop) {
const void* buf;
ssize_t len;
int fd;
int r;
buf = "";
len = 1;
fd = loop->async_wfd;
#if defined(__linux__)
if (fd == -1) {
static const uint64_t val = 1;
buf = &val;
len = sizeof(val);
fd = loop->async_io_watcher.fd;
}
#endif
do
r = write(fd, buf, len);
while (r == -1 && errno == EINTR);
if (r == len)
return;
if (r == -1)
if (errno == EAGAIN || errno == EWOULDBLOCK)
return;
abort();
}
前面所介绍的都是初始化与通知的方式,那么在事件循环中怎么去处理 async handle 呢?
我们注意到uv_async_init()函数可能多次被调用,初始化多个 async handle,但是 loop->async_io_watcher 只有一个,那么问题来了,那么多个 async handle 都共用一个 io 观察者(假设 loop 是一个),那么在 loop->async_io_watcher 上有 I/O 事件时,并不知道是哪个 async handle 发送的,因此我们要知道 async handle 是如何处理这些的。
我们也知道从 uv__io_init() 函数中已经注册了一个 uv__async_io() 函数用于处理 loop->async_io_watcher 的 I/O 事件,那么我们就看 uv__async_io() 函数的处理过程即可:
static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
char buf[1024];
ssize_t r;
QUEUE queue;
QUEUE* q;
uv_async_t* h;
assert(w == &loop->async_io_watcher);
for (;;) {
r = read(w->fd, buf, sizeof(buf));
if (r == sizeof(buf))
continue;
if (r != -1)
break;
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
if (errno == EINTR)
continue;
abort();
}
QUEUE_MOVE(&loop->async_handles, &queue);
while (!QUEUE_EMPTY(&queue)) {
q = QUEUE_HEAD(&queue);
h = QUEUE_DATA(q, uv_async_t, queue);
QUEUE_REMOVE(q);
QUEUE_INSERT_TAIL(&loop->async_handles, q);
if (0 == uv__async_spin(h))
continue;
if (h->async_cb == NULL)
continue;
h->async_cb(h);
}
}
样例
接着来一个例子吧,都是非常简单的,创建 1 个线程,在事件循环中等待 async handle。
#include <stdio.h>
#include <unistd.h>
#include <uv.h>
void wake_entry(void *arg)
{
sleep(5);
printf("wake_entry running, wake async!\n");
uv_async_send((uv_async_t*)arg);
uv_stop(uv_default_loop());
}
void my_async_cb(uv_async_t* handle)
{
printf("my async running!\n");
}
int main()
{
uv_thread_t wake;
uv_async_t async;
uv_async_init(uv_default_loop(), &async, my_async_cb);
uv_thread_create(&wake, wake_entry, &async);
uv_run(uv_default_loop(), UV_RUN_DEFAULT);
uv_thread_join(&wake);
return 0;
}