Skip to main content

libuv 句柄解读-流(stream)-1

解读

stream handle可以被译为流句柄,它在 libuv 中是一个抽象的数据类型,为 libuv 提供了全双工的通信方式,可以说它只是一个父类,通过它派生出 uv_tcp_t、uv_pipe_t、uv_tty_t 这 3 个子类,在这些handle中,都使用了stream handle 的成员变量及处理方法。

通过uv_stream_t可以定义一个 stream handle的实例。

typedef struct uv_stream_s uv_stream_t;
struct uv_stream_s {
UV_HANDLE_FIELDS
UV_STREAM_FIELDS
};

#define UV_STREAM_FIELDS \
size_t write_queue_size; \
uv_alloc_cb alloc_cb; \
uv_read_cb read_cb; \
UV_STREAM_PRIVATE_FIELDS

其实 stream handle 是属于handle的子类,因此它的数据结构中包含了 handle 的成员变量,还包含它自身的一个成员变量 UV_STREAM_FIELDS ,它分为公有字段与私有字段,公有字段只有 write_queue_size、 alloc_cb 、 read_cb,私有字段就是 UV_STREAM_PRIVATE_FIELDS ,它是分为Windows平台与linux平台的,此处以 linux 为例。

#define UV_STREAM_PRIVATE_FIELDS                                              \
uv_connect_t *connect_req; \
uv_shutdown_t *shutdown_req; \
uv__io_t io_watcher; \
void* write_queue[2]; \
void* write_completed_queue[2]; \
uv_connection_cb connection_cb; \
int delayed_error; \
int accepted_fd; \
void* queued_fds; \
UV_STREAM_PRIVATE_PLATFORM_FIELDS \

  • connect_req: 其实 uv_connect_t 是一个请求,从前面的文章我们也知道,在libuv存在 handlerequest,很明显connect_req就是一个请求,它的作用就是请求建立连接,比如类似建立 tcp 连接。
  • shutdown_req: uv_shutdown_t也是一个请求,它的作用与uv_connect_t刚好相反,关闭一个连接。
  • io_watcher: 抽象出来的io观察者
  • write_queue: 写数据队列。
  • write_completed_queue: 完成的写数据队列。
  • connection_cb: 有新连接时的回调函数。
  • delayed_error: 延时的错误代码。
  • accepted_fd: 接受连接的描述符 fd
  • queued_fds: fd 队列,可能有多个fd在排队。
  • UV_STREAM_PRIVATE_PLATFORM_FIELDS: 目前为空。

其实现在不太了解无所谓,就先看下去,我在写文章的时候其实也没有完全理解透彻。

总结一下它的框架示意图,如下:

stream handle其实并未提供用户的API接口,但提供了内部的API接口,供子类使用,比如在创建一个tcp的时候,就会通过uv__stream_init()函数去初始化一个 stream handle,又比如在读写流操作的时候肯定是通过stream handle去操作的,因此它又需要实现内部的读写操作接口,相关的函数如下:

void uv__stream_init(uv_loop_t* loop,
uv_stream_t* stream,
uv_handle_type type);

static void uv__stream_connect(uv_stream_t*);
static void uv__write(uv_stream_t* stream);
static void uv__read(uv_stream_t* stream);
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
static void uv__write_callbacks(uv_stream_t* stream);
static size_t uv__write_req_size(uv_write_t* req);

uv__stream_init()

初始化一个stream handle,设置函数原型:

void uv__stream_init(uv_loop_t* loop,
uv_stream_t* stream,
uv_handle_type type);

参数:

  • loop:传入了事件循环的句柄。
  • stream:指定初始化的stream handle
  • type:指定stream handle的类型,注意看它的类型参数是handle类型的,而handle类型有很多,但是对与这个stream handle来说可选的值基本上只有UV_TCP、UV_TTY、UV_PIPE

源码的实现:

void uv__stream_init(uv_loop_t* loop,
uv_stream_t* stream,
uv_handle_type type) {
int err;

uv__handle_init(loop, (uv_handle_t*)stream, type);
stream->read_cb = NULL;
stream->alloc_cb = NULL;
stream->close_cb = NULL;
stream->connection_cb = NULL;
stream->connect_req = NULL;
stream->shutdown_req = NULL;
stream->accepted_fd = -1;
stream->queued_fds = NULL;
stream->delayed_error = 0;
QUEUE_INIT(&stream->write_queue);
QUEUE_INIT(&stream->write_completed_queue);
stream->write_queue_size = 0;

if (loop->emfile_fd == -1) {
err = uv__open_cloexec("/dev/null", O_RDONLY);
if (err < 0)

err = uv__open_cloexec("/", O_RDONLY);
if (err >= 0)
loop->emfile_fd = err;
}

#if defined(__APPLE__)
stream->select = NULL;
#endif

uv__io_init(&stream->io_watcher, uv__stream_io, -1);
}

说说处理的逻辑:

  1. 调用uv__handle_init()函数将stream handle初始化,主要设置loop类型、以及UV_HANDLE_REF标记。

  2. 初始化stream handle中的成员变量。

  3. 初始化write_queuewrite_completed_queue队列,可能有人有疑问了,为啥要写队列还要 写完成 两个队列,因为啊 libuv 是为了实现异步,写操作为了实现异步非阻塞,你不能直接写,你得通过写队列去操作,它会首先将数据丢到队列中,下层 io 观察者触发可写事件时才去写入,当写完了就告诉你。

  4. 最后调用uv__io_init()函数去初始化io观察者,并设置stream的回调处理函数uv__stream_io(),这个处理回调函数后续慢慢讲解吧,先来看看stream handle 的读写操作。

uv__read()

io观察者发现stream handle有可读事件时,uv__read()函数会被调用,其实是被uv__stream_io()函数调用,因为io观察者发现了底层有数据可读。所以该函数是用于从底层读取数据,这也是stream handle的读取操作。

uv__read() 函数是通过 read() 函数从底层文件描述符读取数据,读取的数据写入由 stream->alloc_cb 分配到内存块中,并在完成读取后由 stream->read_cb 回调函数传递到用户。因为数据已经由底层准备好,直接读取即可,效率非常高,是不需要等待的。而当底层没有数据的情况时,read() 系统调用也会阻塞,而是直接返回,因为文件描述符工作在非阻塞模式下,即使底层还没有数据,它也不会阻塞的,而真正阻塞的地方是在 io 循环中。

简单看看函数源码吧:

static void uv__read(uv_stream_t* stream) {
uv_buf_t buf;
ssize_t nread;
struct msghdr msg;
char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)];
int count;
int err;
int is_ipc;

stream->flags &= ~UV_HANDLE_READ_PARTIAL;


count = 32;


is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;


while (stream->read_cb
&& (stream->flags & UV_HANDLE_READING)
&& (count-- > 0)) {
assert(stream->alloc_cb != NULL);

buf = uv_buf_init(NULL, 0);


stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);

if (buf.base == NULL || buf.len == 0) {


stream->read_cb(stream, UV_ENOBUFS, &buf);
return;
}


assert(buf.base != NULL);
assert(uv__stream_fd(stream) >= 0);


if (!is_ipc) {
do {

nread = read(uv__stream_fd(stream), buf.base, buf.len);
}

while (nread < 0 && errno == EINTR);
} else {

msg.msg_flags = 0;
msg.msg_iov = (struct iovec*) &buf;
msg.msg_iovlen = 1;
msg.msg_name = NULL;
msg.msg_namelen = 0;

msg.msg_controllen = sizeof(cmsg_space);
msg.msg_control = cmsg_space;

do {

nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
}
while (nread < 0 && errno == EINTR);
}


if (nread < 0) {

if (errno == EAGAIN || errno == EWOULDBLOCK) {


if (stream->flags & UV_HANDLE_READING) {


uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
uv__stream_osx_interrupt_select(stream);
}
stream->read_cb(stream, 0, &buf);
#if defined(__CYGWIN__) || defined(__MSYS__)
} else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) {
uv__stream_eof(stream, &buf);
return;
#endif
} else {

stream->read_cb(stream, UV__ERR(errno), &buf);
if (stream->flags & UV_HANDLE_READING) {
stream->flags &= ~UV_HANDLE_READING;
uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
if (!uv__io_active(&stream->io_watcher, POLLOUT))
uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
}
}
return;
} else if (nread == 0) {
uv__stream_eof(stream, &buf);
return;
} else {

ssize_t buflen = buf.len;


if (is_ipc) {
err = uv__stream_recv_cmsg(stream, &msg);
if (err != 0) {
stream->read_cb(stream, err, &buf);
return;
}
}

#if defined(__MVS__)
if (is_ipc && msg.msg_controllen > 0) {
uv_buf_t blankbuf;
int nread;
struct iovec *old;

blankbuf.base = 0;
blankbuf.len = 0;
old = msg.msg_iov;
msg.msg_iov = (struct iovec*) &blankbuf;
nread = 0;
do {
nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
err = uv__stream_recv_cmsg(stream, &msg);
if (err != 0) {
stream->read_cb(stream, err, &buf);
msg.msg_iov = old;
return;
}
} while (nread == 0 && msg.msg_controllen > 0);
msg.msg_iov = old;
}
#endif


stream->read_cb(stream, nread, &buf);


if (nread < buflen) {
stream->flags |= UV_HANDLE_READ_PARTIAL;
return;
}
}
}
}

uv__write()

同理地,当 io 观察者发现要写入数据的时候,它也会去将数据写入到底层,函数 uv__write() 会被调用,那什么时候才是可写呢,回顾 stream handle 的成员变量,它有两个队列,当 stream->write_queue 队列存在数据时,表示可以写入,如果队列为空则表示没有数据可以写。

libuv 的异步处理都是差不多的,都是通过io观察者去发现是否有可读可写,写数据的过程大致如下:用户将数据丢到写队列中就直接返回了,io观察者发现队列有数据,stream handle 的处理 uv__stream_io()函数被调用,开始写入操作,这个写入的操作是依赖系统的函数接口的,比如write()等,等写完了就通知用户即可。

源码的实现:

static void uv__write(uv_stream_t* stream) {
struct iovec* iov;
QUEUE* q;
uv_write_t* req;
int iovmax;
int iovcnt;
ssize_t n;
int err;

start:


assert(uv__stream_fd(stream) >= 0);

if (QUEUE_EMPTY(&stream->write_queue))
return;

q = QUEUE_HEAD(&stream->write_queue);
req = QUEUE_DATA(q, uv_write_t, queue);
assert(req->handle == stream);


assert(sizeof(uv_buf_t) == sizeof(struct iovec));
iov = (struct iovec*) &(req->bufs[req->write_index]);
iovcnt = req->nbufs - req->write_index;

iovmax = uv__getiovmax();


if (iovcnt > iovmax)
iovcnt = iovmax;

if (req->send_handle) {
int fd_to_send;
struct msghdr msg;
struct cmsghdr *cmsg;
union {
char data[64];
struct cmsghdr alias;
} scratch;

if (uv__is_closing(req->send_handle)) {
err = UV_EBADF;
goto error;
}

fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);

memset(&scratch, 0, sizeof(scratch));

assert(fd_to_send >= 0);

msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = iovcnt;
msg.msg_flags = 0;

msg.msg_control = &scratch.alias;
msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));

cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send));


{
void* pv = CMSG_DATA(cmsg);
int* pi = pv;
*pi = fd_to_send;
}

do
n = sendmsg(uv__stream_fd(stream), &msg, 0);
while (n == -1 && RETRY_ON_WRITE_ERROR(errno));


if (n >= 0)
req->send_handle = NULL;
} else {
do

n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
}

if (n == -1 && !IS_TRANSIENT_WRITE_ERROR(errno, req->send_handle)) {
err = UV__ERR(errno);
goto error;
}

if (n >= 0 && uv__write_req_update(stream, req, n)) {
uv__write_req_finish(req);
return;
}


if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
goto start;


uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);


uv__stream_osx_interrupt_select(stream);

return;

error:
req->error = err;
uv__write_req_finish(req);
uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
if (!uv__io_active(&stream->io_watcher, POLLIN))
uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
}

uv__stream_io()

uv__stream_io() 函数是 stream handle 的事件处理函数,它在uv__io_init()函数就被注册了,在调用 uv__stream_io() 函数时,传递了事件循环对象、io 观察者对象、事件类型等信息。

我们来看看stream handle是如何处理可读写事件的:

  1. 通过container_of()函数获取 stream handle 的实例,其实是计算出来的。
  2. 如果 stream->connect_req存在,说明 该 stream handle 需要进行连接,于是调用 uv__stream_connect() 函数请求建立连接。
  3. 满足可读取数据的条件,调用uv__read()函数进行数据读取
  4. 如果满足流结束条件 调用 uv__stream_eof() 进行相关处理。
  5. 如果满足可写条件,调用 uv__write() 函数去写入数据,当然,数据会被放在 stream->write_queue 队列中。
  6. 在写完数据后,调用 uv__write_callbacks() 函数去清除队列的数据,并通知应用层已经写完了。
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
uv_stream_t* stream;


stream = container_of(w, uv_stream_t, io_watcher);


assert(stream->type == UV_TCP ||
stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY);
assert(!(stream->flags & UV_HANDLE_CLOSING));

if (stream->connect_req) {

uv__stream_connect(stream);
return;
}


assert(uv__stream_fd(stream) >= 0);


if (events & (POLLIN | POLLERR | POLLHUP))
uv__read(stream);


if (uv__stream_fd(stream) == -1)
return;


if ((events & POLLHUP) &&
(stream->flags & UV_HANDLE_READING) &&
(stream->flags & UV_HANDLE_READ_PARTIAL) &&
!(stream->flags & UV_HANDLE_READ_EOF)) {
uv_buf_t buf = { NULL, 0 };
uv__stream_eof(stream, &buf);
}

if (uv__stream_fd(stream) == -1)
return;


if (events & (POLLOUT | POLLERR | POLLHUP)) {
uv__write(stream);
uv__write_callbacks(stream);


if (QUEUE_EMPTY(&stream->write_queue))
uv__drain(stream);
}
}

uv__write_callbacks()

清理 stream->write_completed_queue 已完成写请求的队列,清理空间,并调用回调函数。

static void uv__write_callbacks(uv_stream_t* stream) {
uv_write_t* req;
QUEUE* q;
QUEUE pq;

if (QUEUE_EMPTY(&stream->write_completed_queue))
return;


QUEUE_MOVE(&stream->write_completed_queue, &pq);


while (!QUEUE_EMPTY(&pq)) {

q = QUEUE_HEAD(&pq);
req = QUEUE_DATA(q, uv_write_t, queue);
QUEUE_REMOVE(q);
uv__req_unregister(stream->loop, req);


if (req->bufs != NULL) {
stream->write_queue_size -= uv__write_req_size(req);
if (req->bufs != req->bufsml)
uv__free(req->bufs);
req->bufs = NULL;
}


if (req->cb)
req->cb(req, req->error);
}
}

在下一章讲解吧。

参考