Skip to main content

libuv 句柄解读-流(stream)-2

解读

我们接着上一篇文章接着讲解流操作的 API。这些 api 接口我们会很经常使用到,比如在读取 tcp、udp、文件数据的时候,就会用到,同理写数据的时候也会用到。

关闭流的写端口,它会等待未完成的写操作,在关闭后通过 uv_shutdown_cb 指定的回调函数告知应用层。

注意了,它并不是关闭 stream handle,只是关闭了写入端。

参数:

  • req: 指定关闭的请求。
  • stream: 指定 stream handle
  • cb: 在关闭后告知应用层的回调函数。
int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {


assert(stream->type == UV_TCP ||
stream->type == UV_TTY ||
stream->type == UV_NAMED_PIPE);

if (!(stream->flags & UV_HANDLE_WRITABLE) ||
stream->flags & UV_HANDLE_SHUT ||
stream->flags & UV_HANDLE_SHUTTING ||
uv__is_closing(stream)) {
return UV_ENOTCONN;
}

assert(uv__stream_fd(stream) >= 0);


uv__req_init(stream->loop, req, UV_SHUTDOWN);
req->handle = stream;
req->cb = cb;
stream->shutdown_req = req;


stream->flags |= UV_HANDLE_SHUTTING;


uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);

return 0;
}

开始侦听新来的连接,如果你学习过 TCP 协议,那么对 listen 应该很熟悉,它就是用于监听连接的请求的。

函数实现:

int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
int err;


switch (stream->type) {
case UV_TCP:


err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
break;

case UV_NAMED_PIPE:


err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
break;

default:
err = UV_EINVAL;
}

if (err == 0)
uv__handle_start(stream);

return err;
}

参数:

  • stream:指定 stream handle。
  • backlog:指定 libuv 监听的最大的连接数。
  • cb:连接的回调函数,当接受到新来的连接时,调用 uv_connection_cb 回调函数。

注意,这个函数只有 tcp 或者 pipe 类型的 stream handle 才可使用。

调用用来配合 uv_listen() 接受新来的连接。一般来说会在 uv_connection_cb 的回调函数中去调用这个 uv_accept() 函数以接受连接,这与 tcp 协议的处理是非常像的。

注意:在调用这个函数前,客户端句柄必须被初始化。

int uv_accept(uv_stream_t* server, uv_stream_t* client) {
int err;

assert(server->loop == client->loop);

if (server->accepted_fd == -1)
return UV_EAGAIN;

switch (client->type) {
case UV_NAMED_PIPE:
case UV_TCP:


err = uv__stream_open(client,
server->accepted_fd,
UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
if (err) {

uv__close(server->accepted_fd);
goto done;
}
break;

case UV_UDP:

err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
if (err) {
uv__close(server->accepted_fd);
goto done;
}
break;

default:
return UV_EINVAL;
}

client->flags |= UV_HANDLE_BOUND;

done:

if (server->queued_fds != NULL) {
uv__stream_queued_fds_t* queued_fds;

queued_fds = server->queued_fds;


server->accepted_fd = queued_fds->fds[0];


assert(queued_fds->offset > 0);
if (--queued_fds->offset == 0) {
uv__free(queued_fds);
server->queued_fds = NULL;
} else {

memmove(queued_fds->fds,
queued_fds->fds + 1,
queued_fds->offset * sizeof(*queued_fds->fds));
}
} else {
server->accepted_fd = -1;
if (err == 0)
uv__io_start(server->loop, &server->io_watcher, POLLIN);
}
return err;
}

uv_read_start()

当连接成功后,可以调用uv_read_start()函数去监听流的读取端,当有数据可读的时候,将会调用uv_read_cb指定的回调函数,递交到用户去处理这些数据。

  • 函数原型
int uv_read_start(uv_stream_t* stream,
uv_alloc_cb alloc_cb,
uv_read_cb read_cb)

参数:

  • stream:指定 stream handle。
  • alloc_cb:读取数据时调用该函数分配内存空间。
  • read_cb:读取成功后触发异步回调。

函数源码:

int uv_read_start(uv_stream_t* stream,
uv_alloc_cb alloc_cb,
uv_read_cb read_cb) {
assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY);

if (stream->flags & UV_HANDLE_CLOSING)
return UV_EINVAL;

if (!(stream->flags & UV_HANDLE_READABLE))
return UV_ENOTCONN;


stream->flags |= UV_HANDLE_READING;

assert(uv__stream_fd(stream) >= 0);
assert(alloc_cb);


stream->read_cb = read_cb;
stream->alloc_cb = alloc_cb;


uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
uv__handle_start(stream);
uv__stream_osx_interrupt_select(stream);

return 0;
}

uv_read_stop()

uv_read_start()函数刚好相反,uv_read_stop()函数是停止从流读取数据。 uv_read_cb 回调函数将不再被调用。

参数:

  • stream:指定 stream handle。
int uv_read_stop(uv_stream_t* stream) {


if (!(stream->flags & UV_HANDLE_READING))
return 0;


stream->flags &= ~UV_HANDLE_READING;


uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);


if (!uv__io_active(&stream->io_watcher, POLLOUT))
uv__handle_stop(stream);

uv__stream_osx_interrupt_select(stream);


stream->read_cb = NULL;
stream->alloc_cb = NULL;
return 0;
}

uv_write()

stream handle 写入数据,实际上是调用uv_write2()这个函数。

参数:

  • req: 请求。
  • handle: 指定的stream handle
  • bufs: 要写入的buf数据。
  • nbufs: 要写入数据的大小。
  • cb: 当写操作完成后,调用的回调函数。
int uv_write(uv_write_t* req,
uv_stream_t* handle,
const uv_buf_t bufs[],
unsigned int nbufs,
uv_write_cb cb) {
return uv_write2(req, handle, bufs, nbufs, NULL, cb);
}

uv_write2()

扩展的写函数,可用于在管道上发送数据。

int uv_write2(uv_write_t* req,
uv_stream_t* stream,
const uv_buf_t bufs[],
unsigned int nbufs,
uv_stream_t* send_handle,
uv_write_cb cb) {
int empty_queue;


assert(nbufs > 0);
assert((stream->type == UV_TCP ||
stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY) &&
"uv_write (unix) does not yet support other types of streams");

if (uv__stream_fd(stream) < 0)
return UV_EBADF;

if (!(stream->flags & UV_HANDLE_WRITABLE))
return UV_EPIPE;

if (send_handle) {
if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
return UV_EINVAL;

if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
return UV_EBADF;

#if defined(__CYGWIN__) || defined(__MSYS__)

return UV_ENOSYS;
#endif
}


empty_queue = (stream->write_queue_size == 0);


uv__req_init(stream->loop, req, UV_WRITE);


req->cb = cb;
req->handle = stream;
req->error = 0;
req->send_handle = send_handle;
QUEUE_INIT(&req->queue);

req->bufs = req->bufsml;
if (nbufs > ARRAY_SIZE(req->bufsml))
req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));

if (req->bufs == NULL)
return UV_ENOMEM;

memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
req->nbufs = nbufs;
req->write_index = 0;
stream->write_queue_size += uv__count_bufs(bufs, nbufs);


QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);


if (stream->connect_req) {

}
else if (empty_queue) {
uv__write(stream);
}
else {

assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES));
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);
}

return 0;
}

参考