libuv 句柄解读-流(stream)-1
解读
stream handle可以被译为流句柄,它在 libuv 中是一个抽象的数据类型,为 libuv 提供了全双工的通信方式,可以说它只是一个父类,通过它派生出 uv_tcp_t、uv_pipe_t、uv_tty_t 这 3 个子类,在这些handle中,都使用了stream handle 的成员变量及处理方法。
通过uv_stream_t可以定义一个 stream handle的实例。
typedef struct uv_stream_s uv_stream_t;
struct uv_stream_s {
UV_HANDLE_FIELDS
UV_STREAM_FIELDS
};
#define UV_STREAM_FIELDS \
size_t write_queue_size; \
uv_alloc_cb alloc_cb; \
uv_read_cb read_cb; \
UV_STREAM_PRIVATE_FIELDS
其实 stream handle 是属于handle的子类,因此它的数据结构中包含了 handle 的成员变量,还包含它自身的一个成员变量 UV_STREAM_FIELDS ,它分为公有字段与私有字段,公有字段只有 write_queue_size、 alloc_cb 、 read_cb,私有字段就是 UV_STREAM_PRIVATE_FIELDS ,它是分为Windows平台与linux平台的,此处以 linux 为例。
#define UV_STREAM_PRIVATE_FIELDS \
uv_connect_t *connect_req; \
uv_shutdown_t *shutdown_req; \
uv__io_t io_watcher; \
void* write_queue[2]; \
void* write_completed_queue[2]; \
uv_connection_cb connection_cb; \
int delayed_error; \
int accepted_fd; \
void* queued_fds; \
UV_STREAM_PRIVATE_PLATFORM_FIELDS \
connect_req: 其实uv_connect_t是一个请求,从前面的文章我们也知道,在libuv存在handle与request,很明显connect_req就是一个请求,它的作用就是请求建立连接,比如类似建立 tcp 连接。shutdown_req:uv_shutdown_t也是一个请求,它的作用与uv_connect_t刚好相反,关闭一个连接。io_watcher: 抽象出来的io观察者。write_queue: 写数据队列。write_completed_queue: 完成的写数据队列。connection_cb: 有新连接时的回调函数。delayed_error: 延时的错误代码。accepted_fd: 接受连接的描述符fd。queued_fds:fd队列,可能有多个fd在排队。UV_STREAM_PRIVATE_PLATFORM_FIELDS: 目前为空。
其实现在不太了解无所谓,就先看下去,我在写文章的时候其实也没有完全理解透彻。
总结一下它的框架示意图,如下:

stream handle其实并未提供用户的API接口,但提供了内部的API接口,供子类使用,比如在创建一个tcp的时候,就会通过uv__stream_init()函数去初始化一个 stream handle,又比如在读写流操作的时候肯定是通过stream handle去操作的,因此它又需要实现内部的读写操 作接口,相关的函数如下:
void uv__stream_init(uv_loop_t* loop,
uv_stream_t* stream,
uv_handle_type type);
static void uv__stream_connect(uv_stream_t*);
static void uv__write(uv_stream_t* stream);
static void uv__read(uv_stream_t* stream);
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
static void uv__write_callbacks(uv_stream_t* stream);
static size_t uv__write_req_size(uv_write_t* req);
uv__stream_init()
初始化一个stream handle,设置函数原型:
void uv__stream_init(uv_loop_t* loop,
uv_stream_t* stream,
uv_handle_type type);
参数:
- loop:传入了事件循环的句柄。
- stream:指定初始化的
stream handle。 - type:指定
stream handle的类型,注意看它的类型参数是handle类型的,而handle类型有很多,但是对与这个stream handle来说可选的值基本上只有UV_TCP、UV_TTY、UV_PIPE。
源码的实现:
void uv__stream_init(uv_loop_t* loop,
uv_stream_t* stream,
uv_handle_type type) {
int err;
uv__handle_init(loop, (uv_handle_t*)stream, type);
stream->read_cb = NULL;
stream->alloc_cb = NULL;
stream->close_cb = NULL;
stream->connection_cb = NULL;
stream->connect_req = NULL;
stream->shutdown_req = NULL;
stream->accepted_fd = -1;
stream->queued_fds = NULL;
stream->delayed_error = 0;
QUEUE_INIT(&stream->write_queue);
QUEUE_INIT(&stream->write_completed_queue);
stream->write_queue_size = 0;
if (loop->emfile_fd == -1) {
err = uv__open_cloexec("/dev/null", O_RDONLY);
if (err < 0)
err = uv__open_cloexec("/", O_RDONLY);
if (err >= 0)
loop->emfile_fd = err;
}
#if defined(__APPLE__)
stream->select = NULL;
#endif
uv__io_init(&stream->io_watcher, uv__stream_io, -1);
}
说说处理的逻辑:
-
调用
uv__handle_init()函数将stream handle初始化,主要设置loop、类型、以及UV_HANDLE_REF标记。 -
初始化
stream handle中的成员变量。 -
初始化
write_queue与write_completed_queue队列,可能有人有疑问了,为啥要写队列还要 写完成 两个队列,因为啊 libuv 是为了实现异步,写操作为了实现异步非阻塞,你不能直接写,你得通过写队列去操作,它会首先将数据丢到队列中,下层 io 观察者触发可写事件时才去写入,当写完了就告诉你。 -
最后调用
uv__io_init()函 数去初始化io观察者,并设置stream的回调处理函数uv__stream_io(),这个处理回调函数后续慢慢讲解吧,先来看看stream handle的读写操作。
uv__read()
当io观察者发现stream handle有可读事件时,uv__read()函数会被调用,其实是被uv__stream_io()函数调用,因为io观察者发现了底层有数据可读。所以该函数是用于从底层读取数据,这也是stream handle的读取操作。
uv__read() 函数是通过 read() 函数从底层文件描述符读取数据,读取的数据写入由 stream->alloc_cb 分配到内存块中,并在完成读取后由 stream->read_cb 回调函数传递到用户。因为数据已经由底层准备好,直接读取即可,效率非常高,是不需要等待的。而当底层没有数据的情况时,read() 系统调用也会阻塞,而是直接返回,因为文件描述符工作在非阻塞模式下,即使底层还没有数据,它也不会阻塞的,而真正阻塞的地方是在 io 循环中。
简单看看函数源码吧:
static void uv__read(uv_stream_t* stream) {
uv_buf_t buf;
ssize_t nread;
struct msghdr msg;
char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)];
int count;
int err;
int is_ipc;
stream->flags &= ~UV_HANDLE_READ_PARTIAL;
count = 32;
is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
while (stream->read_cb
&& (stream->flags & UV_HANDLE_READING)
&& (count-- > 0)) {
assert(stream->alloc_cb != NULL);
buf = uv_buf_init(NULL, 0);
stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
if (buf.base == NULL || buf.len == 0) {
stream->read_cb(stream, UV_ENOBUFS, &buf);
return;
}
assert(buf.base != NULL);
assert(uv__stream_fd(stream) >= 0);
if (!is_ipc) {
do {
nread = read(uv__stream_fd(stream), buf.base, buf.len);
}
while (nread < 0 && errno == EINTR);
} else {
msg.msg_flags = 0;
msg.msg_iov = (struct iovec*) &buf;
msg.msg_iovlen = 1;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_controllen = sizeof(cmsg_space);
msg.msg_control = cmsg_space;
do {
nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
}
while (nread < 0 && errno == EINTR);
}
if (nread < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
if (stream->flags & UV_HANDLE_READING) {
uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
uv__stream_osx_interrupt_select(stream);
}
stream->read_cb(stream, 0, &buf);
#if defined(__CYGWIN__) || defined(__MSYS__)
} else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) {
uv__stream_eof(stream, &buf);
return;
#endif
} else {
stream->read_cb(stream, UV__ERR(errno), &buf);
if (stream->flags & UV_HANDLE_READING) {
stream->flags &= ~UV_HANDLE_READING;
uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
if (!uv__io_active(&stream->io_watcher, POLLOUT))
uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
}
}
return;
} else if (nread == 0) {
uv__stream_eof(stream, &buf);
return;
} else {
ssize_t buflen = buf.len;
if (is_ipc) {
err = uv__stream_recv_cmsg(stream, &msg);
if (err != 0) {
stream->read_cb(stream, err, &buf);
return;
}
}
#if defined(__MVS__)
if (is_ipc && msg.msg_controllen > 0) {
uv_buf_t blankbuf;
int nread;
struct iovec *old;
blankbuf.base = 0;
blankbuf.len = 0;
old = msg.msg_iov;
msg.msg_iov = (struct iovec*) &blankbuf;
nread = 0;
do {
nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
err = uv__stream_recv_cmsg(stream, &msg);
if (err != 0) {
stream->read_cb(stream, err, &buf);
msg.msg_iov = old;
return;
}
} while (nread == 0 && msg.msg_controllen > 0);
msg.msg_iov = old;
}
#endif
stream->read_cb(stream, nread, &buf);
if (nread < buflen) {
stream->flags |= UV_HANDLE_READ_PARTIAL;
return;
}
}
}
}
uv__write()
同理地,当 io 观察者发现要写入数据的时候,它也会去将数据写入到底层,函数 uv__write() 会被调用,那什么时候才是可写呢,回顾 stream handle 的成员变量,它有两个队列,当 stream->write_queue 队列存在数据时,表示可以写入,如果队列为空则表示没有数据可以写。
libuv 的异步处理都是差不多的,都是通过io观察者去发现是否有可读可写,写数据的过程大致如下:用户将数据丢到写队列中就直接返回了,io观察者发现队列有数据,stream handle 的处理 uv__stream_io()函数被调用,开始写入操作,这个写入的操作是依赖系统的函数接口的,比如write()等,等写完了就通知用户即可。
源码的实现:
static void uv__write(uv_stream_t* stream) {
struct iovec* iov;
QUEUE* q;
uv_write_t* req;
int iovmax;
int iovcnt;
ssize_t n;
int err;
start:
assert(uv__stream_fd(stream) >= 0);
if (QUEUE_EMPTY(&stream->write_queue))
return;
q = QUEUE_HEAD(&stream->write_queue);
req = QUEUE_DATA(q, uv_write_t, queue);
assert(req->handle == stream);
assert(sizeof(uv_buf_t) == sizeof(struct iovec));
iov = (struct iovec*) &(req->bufs[req->write_index]);
iovcnt = req->nbufs - req->write_index;
iovmax = uv__getiovmax();
if (iovcnt > iovmax)
iovcnt = iovmax;
if (req->send_handle) {
int fd_to_send;
struct msghdr msg;
struct cmsghdr *cmsg;
union {
char data[64];
struct cmsghdr alias;
} scratch;
if (uv__is_closing(req->send_handle)) {
err = UV_EBADF;
goto error;
}
fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);
memset(&scratch, 0, sizeof(scratch));
assert(fd_to_send >= 0);
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = iovcnt;
msg.msg_flags = 0;
msg.msg_control = &scratch.alias;
msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send));
{
void* pv = CMSG_DATA(cmsg);
int* pi = pv;
*pi = fd_to_send;
}
do
n = sendmsg(uv__stream_fd(stream), &msg, 0);
while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
if (n >= 0)
req->send_handle = NULL;
} else {
do
n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
}
if (n == -1 && !IS_TRANSIENT_WRITE_ERROR(errno, req->send_handle)) {
err = UV__ERR(errno);
goto error;
}
if (n >= 0 && uv__write_req_update(stream, req, n)) {
uv__write_req_finish(req);
return;
}
if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
goto start;
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);
return;
error:
req->error = err;
uv__write_req_finish(req);
uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
if (!uv__io_active(&stream->io_watcher, POLLIN))
uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
}
uv__stream_io()
uv__stream_io() 函数是 stream handle 的事件处理函数,它在uv__io_init()函数就被注册了,在调用 uv__stream_io() 函数时,传递了事件循环对象、io 观察者对象、事件类型等信息。
我们来看看stream handle是如何处理可读写事件的:
- 通过
container_of()函数获取stream handle的实例,其实是计算出来的。 - 如果
stream->connect_req存在,说明 该stream handle需要进行连接,于是调用uv__stream_connect()函数请求建立连接。 - 满足可读取数据的条件,调用
uv__read()函数进行数据读取 - 如果满足流结束条件 调用
uv__stream_eof()进行相关处理。 - 如果满足可写条件,调用
uv__write()函数去写入数据,当然,数据会被放在stream->write_queue队列中。 - 在写完数据后,调用
uv__write_callbacks()函数去清除队列的数据,并通知应用层已经写完了。
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
uv_stream_t* stream;
stream = container_of(w, uv_stream_t, io_watcher);
assert(stream->type == UV_TCP ||
stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY);
assert(!(stream->flags & UV_HANDLE_CLOSING));
if (stream->connect_req) {
uv__stream_connect(stream);
return;
}
assert(uv__stream_fd(stream) >= 0);
if (events & (POLLIN | POLLERR | POLLHUP))
uv__read(stream);
if (uv__stream_fd(stream) == -1)
return;
if ((events & POLLHUP) &&
(stream->flags & UV_HANDLE_READING) &&
(stream->flags & UV_HANDLE_READ_PARTIAL) &&
!(stream->flags & UV_HANDLE_READ_EOF)) {
uv_buf_t buf = { NULL, 0 };
uv__stream_eof(stream, &buf);
}
if (uv__stream_fd(stream) == -1)
return;
if (events & (POLLOUT | POLLERR | POLLHUP)) {
uv__write(stream);
uv__write_callbacks(stream);
if (QUEUE_EMPTY(&stream->write_queue))
uv__drain(stream);
}
}