3. EventBus
本文收录自: Vert.x 源码阅读 (3) - EventBus
这是 Vert.x 项目源码阅读笔记的第三篇,主要记录一下 EventBus 相关的代码。EventBus 是一个轻量级的分布式消息系统,让 Vert.x 的服务中各个组件之间可以以一种低耦合的方式交互。代码在这个目录下,主要包含了 EventBus,MessageConsumer,MessageProducer,以及 DeliveryContext。
消息总线 EventBus
EventBus 提供了两类接口:
- 通讯相关的接口,例如
send,publish等等。 - 创建更加高级的抽象的接口,例如
consumer,producer。
EventBus 只提供的 best-effort 的投递保证,它提供了三种通讯模式:
| 模式名称 | 特征 | 方法 |
|---|---|---|
| Pub-Sub | 异步单向,一对多 | publish |
| P2P | 不期待响应,一对一 | send |
| Request-Response (以下简称 RR) | 期待响应,一对一 | request |
其中 P2P 和 RR 模式非常类似,区别只在于 RR 会指定一个 replyHandler(实现上会在投递的消息中指定一个 replyAddress,之后详细介绍)。
Message 类
Message 是实际发送的消息,它实际上是对网络请求的封装。它包含了消息的发送地址,接收地址等等,这里不详细介绍了。
MessageConsumer
MessageConsumer (逻辑上很自然地)扩展了 ReadStream<Message<T>>接口,是对消息处理做的抽象。它是线程安全的,但是如果能只在单一线程(也就是之后会讲到的事件循环线程)上被使用的话,性能会更好一点。这是因为它的并发控制使用了 synchronize 关键字,而 JVM 在没有竞争的情况下会退化到使用偏向锁,减少同步开销。
实现
MessageConsumer 本身是一个接口,它的实现是 MessageConsumerImpl。我们这里先看一下它的核心方法之一,doReceive(Message<T> message)。它是新消息进入时的入口方法,核心代码如下:
protected void doReceive(Message<T> message) {
Handler<Message<T>> theHandler;
synchronized (this) {
if (demand == 0L) {
if (pending.size() < MaxBuffer) {
pending.add(message);
} else {
discardHandler.handle(message);
}
} else {
if (pending.size() > 0) {
pending.add(message);
message = pending.poll();
}
theHandler = handler;
}
}
deliver(theHandler, message);
}
从这个方法我们看出来 MessageConsumer 的运作原理。
这里还有一个比较有意思的地方是 deliver 方法:
private void deliver(Handler<Message<T>> theHandler, Message<T> message) {
String creditsAddress = message.headers().get(CREDIT_ADDRESS_HEADER_NAME);
if (creditsAddress != null) {
eventBus.send(creditsAddress, 1);
}
dispatch(theHandler, message, context.duplicate());
checkNextTick();
}
这个方法的前四行代码发送了 1 这条消息给 creditsAddress,实际上是给 MessageProducer 监听的一个” 队列” 发送 credit,MessageProducer 收到这个 credit 就会给自己” 回一滴血”,表示自己能够多一个可以发消息的额度。
MessageProducer
MessageProducer (在逻辑上很自然地)扩展了 WriteStream<T> 接口,是对消息发送做的抽象。
实现
MessageProducerImpl 是 MessageProducer 的实现。它有两个核心方法,分别是 write 和 doReceiveCredit。
@Override
public void write(T data, Handler<AsyncResult<Void>> handler) {
Promise<Void> promise = createPromise();
promise.future().setHandler(handler);
write(data, promise);
}
private void write(T data, Promise<Void> handler) {
MessageImpl msg = createMessage();
OutboundDeliveryContext<T> sendCtx = createContext();
if (send) {
synchronized (this) {
if (credits > 0) {
credits--;
} else {
pending.add(sendCtx);
return;
}
}
}
bus.sendOrPubInternal(msg, options, null, handler);
}
private synchronized void doReceiveCredit(int credit) {
credits += credit;
while (credits > 0) {
}
checkDrained();
}
最开始我比较困惑的一点是,通过网络来恢复额度,总感觉不是很稳,要是网络挂 了,就没法回血来允许接着发消息。但是仔细想想,要是网络挂了,发消息也没有意义了。
DeliveryContext
DeliveryContext 封装了一个要发送的消息,同时提供了一些控制方法。它最核心的方法就是 next()。一个 DeliveryContext 会包含多个 interceptor,而 next 会遍历这些 interceptor,一个个调用,直到全部调用完之后,再实际发送。这个类似于责任链模式,在很多框架中都使用了,例如 Spring Boot,Netty。