目录 源码分析ChannelFuture f = b.bind(8888).sync() validate()方法 doBind(localAddress)方法 initAndRegister() init(channel) group().register(channel) doBind0(regFuture, channel, localAddress, promise); 总结 正文 本文接着前两篇文章来讲,主要讲服务端类剩下的部分,我们还是来先看看服务端的代码 复制代码 /** * Created by chenhao on 2019/9/4. */ public final class SimpleServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new SimpleServerHandler()) .childHandler(new SimpleServerInitializer()) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind(8888).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } 复制代码 在前面两篇博文中从源码的角度分析了如下几行代码主要做了哪些工作。 复制代码 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new SimpleServerHandler()) .childHandler(new SimpleServerInitializer()) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); 复制代码 本篇博文将从源码的角度分析ChannelFuture f = b.bind(8888).sync() 的内部实现。这样就完成了Netty服务器端启动过程的源码分析。 回到顶部 源码分析ChannelFuture f = b.bind(8888).sync() AbstractBootstrap.java 复制代码 public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); } 复制代码 我们接着看重载的bind 复制代码 public ChannelFuture bind(SocketAddress localAddress) { validate();//相关参数的检查 if (localAddress == null) { throw new NullPointerException("localAddress"); } return doBind(localAddress);//下面将分析 } 复制代码 该函数主要看两点:validate()和doBind(localAddress) validate()方法 复制代码 //函数功能:检查相关参数是否设置了 @SuppressWarnings("unchecked") public B validate() { if (group == null) {//这里的group指的是:b.group(bossGroup, workerGroup)代码中的bossGroup throw new IllegalStateException("group not set"); } if (channelFactory == null) { throw new IllegalStateException("channel or channelFactory not set"); } return (B) this; } 复制代码 该方法主要检查了两个参数,一个是group,一个是channelFactory,在这里可以想一想这两个参数是在哪里以及何时被赋值的?答案是在如下代码块中被赋值的,其中是将bossGroup赋值给了group,将BootstrapChannelFactory赋值给了channelFactory. 复制代码 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) 复制代码 doBind(localAddress)方法 doBind方法的源代码如下: 复制代码 private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister();//1 final Channel channel = regFuture.channel();//2 if (regFuture.cause() != null) { return regFuture; } final ChannelPromise promise; if (regFuture.isDone()) { promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); } else { // Registration future is almost always fulfilled already, but just in case it's not. promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { doBind0(regFuture, channel, localAddress, promise); } }); } return promise; } 复制代码 doBind这个函数是我们要分析的重点,这个函数的主要工作有如下几点: 1、通过initAndRegister()方法得到一个ChannelFuture的实例regFuture。 2、通过regFuture.cause()方法判断是否在执行initAndRegister方法时产生来异常。如果产生来异常,则直接返回,如果没有产生异常则进行第3步。 3、通过regFuture.isDone()来判断initAndRegister方法是否执行完毕,如果执行完毕来返回true,然后调用doBind0进行socket绑定。如果没有执行完毕则返回false进行第4步。 4、regFuture会添加一个ChannelFutureListener监听,当initAndRegister执行完成时,调用operationComplete方法并执行doBind0进行socket绑定。 第3、4点想干的事就是一个:调用doBind0方法进行socket绑定。 下面将分成4部分对每行代码具体做了哪些工作进行详细分析。 initAndRegister() 该方法的具体代码如下: 复制代码 final ChannelFuture initAndRegister() { //结论:这里的channel为一个NioServerSocketChannel对象,具体分析见后面 final Channel channel = channelFactory().newChannel();//1 try { init(channel);//2 } catch (Throwable t) { channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = group().register(channel);//3 if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; } 复制代码 通过函数名以及内部调用的函数可以猜测该函数干了两件事情: 1、初始化一个Channel,要想初始化,肯定要先得到一个Channel。 复制代码 final Channel channel = channelFactory().newChannel();//1 init(channel);//2 复制代码 2、将Channel进行注册。 ChannelFuture regFuture = group().register(channel);//3 下面我们将分析这几行代码内部干来些什么。 final Channel channel = channelFactory().newChannel(); 在上一篇文章中(Netty源码分析 (二)----- ServerBootstrap)分析中,我们知道b.channel(NioServerSocketChannel.class)的功能为:设置父类属性channelFactory 为: BootstrapChannelFactory类的对象。其中这里BootstrapChannelFactory对象中包括一个clazz属性为:NioServerSocketChannel.class 因此,final Channel channel = channelFactory().newChannel();就是调用的BootstrapChannelFactory类中的newChannel()方法,该方法的具体内容为: 复制代码 private static final class BootstrapChannelFactory implements ChannelFactory { private final Class clazz; BootstrapChannelFactory(Class clazz) { this.clazz = clazz; } @Override public T newChannel() { try { return clazz.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } } @Override public String toString() { return StringUtil.simpleClassName(clazz) + ".class"; } } 复制代码 看到这个类,我们可以得到的结论:final Channel channel = channelFactory().newChannel();这行代码的作用为通过反射产生来一个NioServerSocketChannel类的实例。 NioServerSocketChannel构造器 下面将看下NioServerSocketChannel类的构造函数做了哪些工作。 NioServerSocketChannel类的继承体系结构如下: 其无参构造函数如下: 复制代码 public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } 复制代码 无参构造函数中SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider()。 函数newSocket的功能为:利用SelectorProvider产生一个SocketChannelImpl对象。 复制代码 private static ServerSocketChannel newSocket(SelectorProvider provider) { try { return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } } public SocketChannel openSocketChannel() throws IOException { return new SocketChannelImpl(this); } 复制代码 无参构造函数通过newSocket函数产生了一个SocketChannelImpl对象 然后调用了如下构造函数,我们继续看 复制代码 public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } //父类AbstractNioMessageChannel的构造函数 protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent, ch, readInterestOp); } //父类 AbstractNioChannel的构造函数 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp;//SelectionKey.OP_ACCEPT try { ch.configureBlocking(false);//设置当前的ServerSocketChannel为非阻塞的 } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } } //父类AbstractChannel的构造函数 protected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this); } 复制代码 new NioServerSocketChannel()产生一个实例对象时,调用上面这么多构造函数主要干了两件事情: 1、产生来一个SocketChannelImpl类的实例,设置到ch属性中,并设置为非阻塞的。 复制代码 this.ch = ch; ch.configureBlocking(false); 复制代码 2、设置了config属性 config = new NioServerSocketChannelConfig(this, javaChannel().socket() 3、设置SelectionKey.OP_ACCEPT事件 this.readInterestOp = readInterestOp;//SelectionKey.OP_ACCEPT 4、设置unsafe属性 复制代码 @Override protected AbstractNioUnsafe newUnsafe() { return new NioMessageUnsafe(); } 复制代码 主要作用为:用来负责底层的connect、register、read和write等操作。 5、设置pipeline属性 pipeline = new DefaultChannelPipeline(this); 每个Channel都有自己的pipeline,当有请求事件发生时,pipeline负责调用相应的hander进行处理。 这些属性在后面都会用到,至于NioServerSocketChannel 对象中的unsafe、pipeline属性的具体实现后面进行分析。 结论:final Channel channel = channelFactory().newChannel();这行代码的作用为通过反射产生来一个NioServerSocketChannel类的实例,其中这个NioServerSocketChannel类对象有这样几个属性:SocketChannel、NioServerSocketChannelConfig 、SelectionKey.OP_ACCEPT事件、NioMessageUnsafe、DefaultChannelPipeline init(channel) init方法的具体代码如下: 复制代码 @Override void init(Channel channel) throws Exception { //1、设置新接入channel的option final Map, Object> options = options(); synchronized (options) { channel.config().setOptions(options);//NioServerSocketChannelConfig } //2、设置新接入channel的attr final Map, Object> attrs = attrs(); synchronized (attrs) { for (Entry, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey key = (AttributeKey) e.getKey(); channel.attr(key).set(e.getValue()); } } //3、设置handler到pipeline上 ChannelPipeline p = channel.pipeline(); if (handler() != null) {//这里的handler()返回的就是第二部分.handler(new SimpleServerHandler())所设置的SimpleServerHandler p.addLast(handler()); } final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry, Object>[] currentChildOptions; final Entry, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } //p.addLast()向serverChannel的流水线处理器中加入了一个ServerBootstrapAcceptor,从名字上就可以看出来,这是一个接入器,专门接受新请求,把新的请求扔给某个事件循环器 p.addLast(new ChannelInitializer() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } 复制代码 该函数的功能为: 1、设置channel的options 如果没有设置,则options为空,该属性在ServerBootstrap类中的定义如下 Map, Object> options = new LinkedHashMap, Object>(); options可能如下: 复制代码 public boolean setOption(ChannelOption option, T value) { validate(option, value); if (option == CONNECT_TIMEOUT_MILLIS) { setConnectTimeoutMillis((Integer) value); } else if (option == MAX_MESSAGES_PER_READ) { setMaxMessagesPerRead((Integer) value); } else if (option == WRITE_SPIN_COUNT) { setWriteSpinCount((Integer) value); } else if (option == ALLOCATOR) { setAllocator((ByteBufAllocator) value); } else if (option == RCVBUF_ALLOCATOR) { setRecvByteBufAllocator((RecvByteBufAllocator) value); } else if (option == AUTO_READ) { setAutoRead((Boolean) value); } else if (option == AUTO_CLOSE) { setAutoClose((Boolean) value); } else if (option == WRITE_BUFFER_HIGH_WATER_MARK) { setWriteBufferHighWaterMark((Integer) value); } else if (option == WRITE_BUFFER_LOW_WATER_MARK) { setWriteBufferLowWaterMark((Integer) value); } else if (option == MESSAGE_SIZE_ESTIMATOR) { setMessageSizeEstimator((MessageSizeEstimator) value); } else { return false; } return true; } 复制代码 2、设置channel的attrs 如果没有设置,则attrs为空,该属性在ServerBootstrap类中的定义如下 private final Map, Object> attrs = new LinkedHashMap, Object>(); 3、设置handler到channel的pipeline上 其中,这里的handler为:在博文(Netty源码分析 (二)----- ServerBootstrap)中分析的通过b.handler(new SimpleServerHandler())所设置的SimpleServerHandler对象 4、在pipeline上添加来一个ChannelInitializer对象,其中重写来initChannel方法。该方法通过p.addLast()向serverChannel的流水线处理器中加入了一个 ServerBootstrapAcceptor, 从名字上就可以看出来,这是一个接入器,专门接受新请求,把新的请求扔给某个事件循环器 看到这里,我们发现其实init只是初始化了一些基本的配置和属性,以及在pipeline上加入了一个接入器,用来专门接受新连接,并没有启动服务. group().register(channel) 回到 initAndRegister 方法中,继续看 config().group().register(channel) 这行代码,config 方法返回了 ServerBootstrapConfig,这个 ServerBootstrapConfig 调用了 group 方法,实际上就是 bossGroup。bossGroup 调用了 register 方法。 前面的分析我们知道group为:NioEvenLoopGroup,其继承MultithreadEventLoopGroup,该类中的register方法如下: 复制代码 @Override public ChannelFuture register(Channel channel) { return next().register(channel);//调用了NioEvenLoop对象中的register方法,NioEventLoop extends SingleThrea