资讯专栏INFORMATION COLUMN

Netty4.x 源码实战系列(二):服务端bind流程详解

laoLiueizo / 661人阅读

摘要:对于,目前大家只知道是个线程组,其内部到底如何实现的,它的作用到底是什么,大家也都不太清楚,由于篇幅原因,这里不作详细介绍,后面会有文章作专门详解。

在上一篇《ServerBootstrap 与 Bootstrap 初探》中,我们已经初步的了解了ServerBootstrap是netty进行服务端开发的引导类。 且在上一篇的服务端示例中,我们也看到了,在使用netty进行网络编程时,我们是通过bind方法的调用来完成服务器端口的侦听:

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
    ServerBootstrap b = new ServerBootstrap(); 
    b.group(bossGroup, workerGroup)  
     .channel(NioServerSocketChannel.class) 
     .handler(new LoggingHandler())   
     .childHandler(new ChannelInitializer() { 
         @Override
         public void initChannel(SocketChannel ch) throws Exception {
             ch.pipeline().addLast(new DiscardServerHandler());
         }
     })
     .option(ChannelOption.SO_BACKLOG, 128)        
     .childOption(ChannelOption.SO_KEEPALIVE, true);
    
     // 侦听8000端口
     ChannelFuture f = b.bind(8000).sync(); 
    
     f.channel().closeFuture().sync();
} finally {
    workerGroup.shutdownGracefully();
    bossGroup.shutdownGracefully();
}

从上面的服务端示例中可以看到,我们只是定义了主线程组及worker线程组,以及指定了channel类型为NioServerSocketChannel等等一些简单的配置, 然后绑定侦听端口,用于网络服务的主体代码基本就完了(业务代码在Handler中实现,后面的文章会详细介绍。
这真的大大简化并方便了java程序员使用netty来进行网络开发,但是想要深入学习netty的人可能会有下面的一些疑问:

netty是继续Java NIO的,那么ServerSocketChannel是什么时候初始化的?

我怎么没有看到Java NIO中的selector, netty是如何实现多路复用的?

我们设置的handler 或者 childHandler,是如何应用的?

boss线程组 以及 worker线程组 其实如何分配线程的?

为什么是boss线程组,难道接收用户请求是多个线程一起工作?

。。。

本篇将带着上面这些疑问,我们将进入bind方法内部,深入浅出,对netty的工作机制一探究竟。

当我们调用ServerBootstrap的bind方法时,其实是调用的是父类AbstractBootstrap的bind方法:

public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
}

进而调用另一个重载bind方法:

public ChannelFuture bind(SocketAddress localAddress) {
    validate();
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    return doBind(localAddress);
}

此bind(SocketAddress localAddress)内部有两个调用
1、 调用validate()
顾名思义,validate应该是做校验,由于ServerBootstrap覆盖了AbstractBootstrap方法,因此此validate其实是调用ServerBootstrap中的validate方法:

@Override
public ServerBootstrap validate() {
    super.validate();
    if (childHandler == null) {
        throw new IllegalStateException("childHandler not set");
    }
    if (childGroup == null) {
        logger.warn("childGroup is not set. Using parentGroup instead.");
        childGroup = config.group();
    }
    return this;
}

在子类ServerBootstrap的validate方法中,首先它有调用了基类的validate()方法:

public B validate() {
    if (group == null) {
        throw new IllegalStateException("group not set");
    }
    if (channelFactory == null) {
        throw new IllegalStateException("channel or channelFactory not set");
    }
    return self();
}

所以通过validate方法我们得出如下结论:

1) 服务端程序必须要设置boss线程组 以及 worker线程组,分别用于Acceptor 以及 I/O操作; 
2)必须通过Boostrap的channel()来指定通道类型,用来生成相应的通道(ServerSocketChannel或SocketChannel);
3) 因为是服务端程序,所以必须设置ChildHandler,来指定业务处理器,否则没有业务处理的服务器hi没有意义的;

2、 调用doBind(localAddress)
首先我们先看一下AbstractBootstrap中doBind方法代码片段:

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();      // (1)
    final Channel channel = regFuture.channel();            // (2)
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);     // (3)
        return promise;
    } else {
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    promise.setFailure(cause);
                } else {
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);  // (3)
                }
            }
        });
        return promise;
    }
}

剥去无用代码,其实doBind方法内部,只做了两件事:
一、调用了initAndRegister方法
二、调用用了doBind0方法
到底这两个方法做了啥工作,我们继续往下分析。

一、首先看看 initAndRegister方法内部代码:

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        // (1) 调用工厂方法,生成channel实例
        channel = channelFactory.newChannel();
        
        // (2) 初始化通道信息
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            channel.unsafe().closeForcibly();
        }
        
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    // (3) 注册通道
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    return regFuture;
}

通过上面代码,我们可以看出initAndRegister方法做了三件事:

①、调用channelFactory生成通道channel实例:

在上一篇中,我们已经知道,通过serverbootstrap的channel方法来指定通道类型,其实是调用基类AbstractBoostrap的channel方法,其内部其实是实例化了一个产生指定channel类型的channelFactory。

所以,initAndRegister中的channelFactory.newChannel()方法就是生成了一个NioServerSocketChannel的实例。 关于NioServerSocketChannel内部细节,我会有专门的文章进行分析,此处不做详述。

②、调用init(channel)初始化通道信息

init方法在基类AbstractBootstrap中是一个抽象方法:

abstract void init(Channel channel) throws Exception;

所以此处init的具体实现在子类ServerBootstrap类中:

@Override
void init(Channel channel) throws Exception {
    // 设置引导类配置的option
    final Map, Object> options = options0();
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }
    // 设置引导类配置的attr
    final Map, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey key = (AttributeKey) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }
    
    // 获取当前通道的pipeline
    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry, Object>[] currentChildOptions;
    final Entry, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    // 给NioServerSocketChannel的pipeline中添加一个ChannelInitializer类型的Handler
    p.addLast(new ChannelInitializer() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

init内部主要做了一下几件事:
ⅰ、 设置channel的options

final Map, Object> options = options0();
synchronized (options) {
    setChannelOptions(channel, options, logger);
}

ⅱ、设置channel的attribute

final Map, Object> attrs = attrs0();
synchronized (attrs) {
    for (Entry, Object> e: attrs.entrySet()) {
        @SuppressWarnings("unchecked")
        AttributeKey key = (AttributeKey) e.getKey();
        channel.attr(key).set(e.getValue());
    }
}

ⅲ、给NioServerSocketChannel的pipeline中添加一个ChannelInitializer类型的Handler(根据类继承ChannelInitializer继承自ChannelInboundHandlerAdapter)

关于pipeline到底是什么,本篇不做详述,下一篇我会跟NioServerSocketChannel来一起给大家分析一下。

③、完成通道的注册
通道初始化完成后,然后就可以注册通道了:

 ChannelFuture regFuture = config().group().register(channel);

config()在AbstractBootstrap中也是个抽象方法:

public abstract AbstractBootstrapConfig config();

所以具体的实现细节还是在子类ServerBootstrap中:

@Override
public final ServerBootstrapConfig config() {
    return config;
}

此方法只会返回了config实例对象,此属性是在ServerBootstrap初始化时就创建了

public class ServerBootstrap extends AbstractBootstrap {
    ...
    private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
    ...
    
    @Override
    public final ServerBootstrapConfig config() {
        return config;
    }
}

我们先看一下ServerBootstrapConfig的类继承结构图:

ServerBootstrapConfig初始化时传入的this对象,此this表示ServerBootstrap,而且ServerBootstrapConfig构造方法内部调用了其基类AbstractBootstrapConfig的构造方法:

ServerBootstrapConfig(ServerBootstrap bootstrap) {
    super(bootstrap);
}

所以config().group()就对应ServerBootstrap的group属性引用(由上一篇得知group指向boss线程组),因此register其实是调用的NioEventLoopGroup的register方法。

对于NioEventLoopGroup,目前大家只知道是个线程组,其内部到底如何实现的,它的作用到底是什么,大家也都不太清楚,由于篇幅原因,这里不作详细介绍,后面会有文章作专门详解。

二、我们再回到doBind(localAddress)方法,内部在调用玩initAndRegister之后,就是调用doBind0方法

 private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

此方法内部就是完成channel的侦听端口的绑定。

至此ServerBootstrap的bind工作执行完成。

此篇对服务端绑定的流程做了大体介绍,但由于篇幅问题,下面几个问题未做详尽分析:
1、 NioServerSocketChannel是如何实例化的
2、 Pipeline是什么,为什么要通过它添加handler
3、 NioEventLoopGroup内部细节是什么,为什么要通过它注册Channel, Java NIO中channel初始化后不是要注册到selector中吗?

带着上面这些疑问,欢迎大家继续关注接下来的几篇文章,在这几篇文章中,bind操作会一直贯穿其中:
Netty4.x 源码实战系列(三):NioServerSocketChannel全剖析
Netty4.x 源码实战系列(四):Pipeline全剖析
Netty4.x 源码实战系列(五):NioEventLoopGroup全剖析
Netty4.x 源码实战系列(六):NioEventLoop全剖析

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68105.html

相关文章

  • Netty4.x 源码实战系列(三):NioServerSocketChannel全剖析

    摘要:本篇将通过实例化过程,来深入剖析。及初始化完成后,它们会相互连接。我们在回到的构造方法父类构造方法调用完成后,还要初始化一下自己的配置对象是的内部类而又是继承自,通过代码分析,此对象就是就会对底层一些配置设置行为的封装。 根据上一篇《Netty4.x 源码实战系列(二):服务端bind流程详解》所述,在进行服务端开发时,必须通过ServerBootstrap引导类的channel方法来...

    Flink_China 评论0 收藏0
  • Netty4.x 源码实战系列(四):Pipeline全剖析

    摘要:在上一篇源码实战系列三全剖析中,我们详细分析了的初始化过程,并得出了如下结论在中,每一个都有一个对象,并且其内部本质上就是一个双向链表本篇我们将深入源码内部,对其一探究竟,给大家一个全方位解析。 在上一篇《Netty4.x 源码实战系列(三):NioServerSocketChannel全剖析》中,我们详细分析了NioServerSocketChannel的初始化过程,并得出了如下结论...

    13651657101 评论0 收藏0
  • Netty4.x 源码实战系列(一):ServerBootstrap 与 Bootstrap 初探

    摘要:而用于主线程池的属性都定义在中本篇只是简单介绍了一下引导类的配置属性,下一篇我将详细介绍服务端引导类的过程分析。 从Java1.4开始, Java引入了non-blocking IO,简称NIO。NIO与传统socket最大的不同就是引入了Channel和多路复用selector的概念。传统的socket是基于stream的,它是单向的,有InputStream表示read和Outpu...

    BakerJ 评论0 收藏0
  • 【自己读源码Netty4.X系列(三) Channel Register

    摘要:我想这很好的解释了中,仅仅一个都这么复杂,在单线程或者说串行的程序中,编程往往是很简单的,说白了就是调用,调用,调用然后返回。 Netty源码分析(三) 前提概要 这次停更很久了,原因是中途迷茫了一段时间,不过最近调整过来了。不过有点要说下,前几天和业内某个大佬聊天,收获很多,所以这篇博文和之前也会不太一样,我们会先从如果是我自己去实现这个功能需要怎么做开始,然后去看netty源码,与...

    darkbug 评论0 收藏0
  • Netty4.x 源码实战系列(五):深入浅出学NioEventLoopGroup

    摘要:接下来的两篇文章,我将从源码角度为大家深入浅出的剖析的线程模型工作机制。我们看一下的源码通过的代码发现,实现了接口,其内部会通过指定的默认线程工厂来创建线程,并执行相应的任务。至此,初始化完成了。下一篇我们将详细介绍,敬请期待。 我们都知道Netty的线程模型是基于React的线程模型,并且我们都知道Netty是一个高性能的NIO框架,那么其线程模型必定是它的重要贡献之一。 在使用ne...

    MSchumi 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<