本文共 9394 字,大约阅读时间需要 31 分钟。
在Netty框架中,Channel是其中之一的核心概念,是Netty网络通信的主体,由它负责同对端进行网络通信、注册和数据操作等功能。本文我们来详细地分析 Netty 中的 Channel以及跟Channel相关的其他概念,包括ChannelPipeline、ChannelHandlerContext、ChannelHandler等。
Channel的抽象类AbstractChannel有一如下所示的构造方法:
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
AbstractChannel内部有一个pipeline属性,Netty在对Channel进行初始化的时候将该属性初始化为DefaultChannelPipeline的实例,上述代码说明了每个Channel都有一个ChannelPipeline与之相关联。
跟踪代码,进入newChannelPipeline的方法体内:
protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); } protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
上述代码,敏感的读者便能觉察到这是一个新建链表节点,并建立双向循环列表的过程。
这里出现了两个新的类型——TailContext和HeadContext,先来看一下两者的类图:
TailContext和HeadContext既实现了ChannelHandler接口,又实现了 ChannelHandlerContext接口, 因此可以说head和tail即是一个ChannelHandler, 又是一个 ChannelHandlerContext。这么设计有什么用呢?让我们先来看一下ChannelHandlerContext概念。
HeadContext的构造方法调用了父类AbstractChannelHandlerContext的构造器, 并传入参数inbound = false,outbound = true。
TailContext的构造器与HeadContext相反, 它调用了父类 AbstractChannelHandlerContext 的构造器, 并传入参数 inbound = true,outbound = false。
那么inbound和outbound这两个属性究竟代表什么呢?其实inbound和outbound分别用于标识 Context 所对应的handler的类型, 在Netty中事件可以分为Inbound和Outbound事件,在ChannelPipeline的类注释中,有如下图示:
* * I/O Request * via {@link Channel} or * {@link ChannelHandlerContext} * | * +---------------------------------------------------+---------------+ * | ChannelPipeline | | * | \|/ | * | +---------------------+ +-----------+----------+ | * | | Inbound Handler N | | Outbound Handler 1 | | * | +----------+----------+ +-----------+----------+ | * | /|\ | | * | | \|/ | * | +----------+----------+ +-----------+----------+ | * | | Inbound Handler N-1 | | Outbound Handler 2 | | * | +----------+----------+ +-----------+----------+ | * | /|\ . | * | . . | * | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()| * | [ method call] [method call] | * | . . | * | . \|/ | * | +----------+----------+ +-----------+----------+ | * | | Inbound Handler 2 | | Outbound Handler M-1 | | * | +----------+----------+ +-----------+----------+ | * | /|\ | | * | | \|/ | * | +----------+----------+ +-----------+----------+ | * | | Inbound Handler 1 | | Outbound Handler M | | * | +----------+----------+ +-----------+----------+ | * | /|\ | | * +---------------+-----------------------------------+---------------+ * | \|/ * +---------------+-----------------------------------+---------------+ * | | | | * | [ Socket.read() ] [ Socket.write() ] | * | | * | Netty Internal I/O Threads (Transport Implementation) | * +-------------------------------------------------------------------+ *
从上图可以看出, inbound事件和outbound事件的流向是不一样的,inbound事件的流行是从下至上, 而outbound刚好相反,是从上到下。同时注释中列出了引发inbound事件和outbound事件传播的方法:
Inbound event propagation methods: ChannelHandlerContext.fireChannelRegistered() ChannelHandlerContext.fireChannelActive() ChannelHandlerContext.fireChannelRead(Object) ChannelHandlerContext.fireChannelReadComplete() ChannelHandlerContext.fireExceptionCaught(Throwable) ChannelHandlerContext.fireUserEventTriggered(Object) ChannelHandlerContext.fireChannelWritabilityChanged() ChannelHandlerContext.fireChannelInactive() ChannelHandlerContext.fireChannelUnregistered()Outbound event propagation methods: ChannelHandlerContext.bind(SocketAddress, ChannelPromise) ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise) ChannelHandlerContext.write(Object, ChannelPromise) ChannelHandlerContext.flush() ChannelHandlerContext.read() ChannelHandlerContext.disconnect(ChannelPromise) ChannelHandlerContext.close(ChannelPromise) ChannelHandlerContext.deregister(ChannelPromise)
仔细分析,我们可以发现Inbound事件都是通知事件,而Outbound事件都是请求事件。
了解ChannelHandlerContext的两个重要属性的含义之后,我们回到其类型本身。我们之前的客户端程序有如下代码:
Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception{ ch.pipeline().addLast(new TimeClientHandler()); } });
通常来说, 我们在初始化 Bootstrap, 会添加我们自定义的 ChannelHandler,上面代码在调用handler方法时, 传入了ChannelInitializer对象, 它提供了一个 initChannel 方法供我们初始化ChannelHandler,让我们来看一下这个初始化过程。
在Bootstrap的init方法中将ChannelInitializer添加到ChannelPipeline中:
void init(Channel channel) throws Exception { ChannelPipeline p = channel.pipeline(); //handler方法返回的就是之前设置的ChannelInitializer对象 p.addLast(config.handler()); …… }
进入ChannelPipeline的addLast方法,我们来看一下添加过程:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkDuplicateName(name); // 检查此 handler 是否有重复的名字 newCtx = new DefaultChannelHandlerContext(this, group, name, handler); addLast0(name, newCtx); } return this; } private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; }
此时,ChannelPipeline的结构如下图所示:
当Channel成功注册到一个eventLoop的Selector中, 并且将当前Channel作为attachment时,便产生了一个Inbound事件触发了ChannelHandlerContext.fireChannelRegistered()方法。进入该方法:
public ChannelHandlerContext fireChannelRegistered() { invokeChannelRegistered(findContextInbound()); return this; } static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRegistered(); } }); } } private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
findContextInbound方法用于从当前context开始查找下一个inbound属性为true的context,而Inbound事件始于head,同时ChannelInitializer的inbound属性为true,因此返回之前设置的ChannelInitializer对象。再进入invokeChannelRegistered方法:
private void invokeChannelRegistered() { if (invokeHandler()) { try { //此处handler方法返回的是Bootstrap中设置ChannelInitializer对象 ((ChannelInboundHandler) handler()).channelRegistered(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRegistered(); } } public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { if (initChannel(ctx)) { //基于责任链模式,事件继续向后传递 ctx.pipeline().fireChannelRegistered(); } else { ctx.fireChannelRegistered(); } } private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { try { //此处调用的initChannel方法为ChannelInitializer对象的initChannel方法 initChannel((C) ctx.channel()); } catch (Throwable cause) { } finally { //功成身退,将ChannelInitializer对象从pipeline中移除 remove(ctx); } return true; } return false; }
经过上述步骤之后,channelPipeline最终的结构如下图所示:
好了, 到了这里, 我们的自定义ChannelHandler的添加过程 也分析的查不多了。