资讯专栏INFORMATION COLUMN

Netty4.x 源码实战系列(一):ServerBootstrap 与 Bootstrap 初探

BakerJ / 2075人阅读

摘要:而用于主线程池的属性都定义在中本篇只是简单介绍了一下引导类的配置属性,下一篇我将详细介绍服务端引导类的过程分析。

从Java1.4开始, Java引入了non-blocking IO,简称NIO。NIO与传统socket最大的不同就是引入了Channel和多路复用selector的概念。传统的socket是基于stream的,它是单向的,有InputStream表示read和OutputStream表示写。而Channel是双工的,既支持读也支持写,channel的读/写都是面向Buffer。 NIO中引入的多路复用Selector机制(如果是linux系统,则应用的epoll事件通知机制)可使一个线程同时监听多个Channel上发生的事件。 虽然Java NIO相比于以往确实是一个大的突破,但是如果要真正上手进行开发,且想要开发出好的一个服务端网络程序,那么你得要花费一点功夫了,毕竟Java NIO只是提供了一大堆的API而已,对于一般的软件开发人员来说只能呵呵了。因此,社区中就涌现了很多基于Java NIO的网络应用框架,其中以Apache的Mina,以及Netty最为出名,从本篇开始我们将深入的分析一下Netty的内部实现细节 。

本系列是基于Netty4.1.18这个版本。

在分析源码之前,我们还是先看看Netty官方的样例代码,了解一下Netty一般是如何进行服务端及客户端开发的。

Netty服务端示例:

EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
    ServerBootstrap b = new ServerBootstrap(); // (2)
    b.group(bossGroup, workerGroup)  // (3)
     .channel(NioServerSocketChannel.class) // (4)
     .handler(new LoggingHandler())    // (5)
     .childHandler(new ChannelInitializer() { // (6)
         @Override
         public void initChannel(SocketChannel ch) throws Exception {
             ch.pipeline().addLast(new DiscardServerHandler());
         }
     })
     .option(ChannelOption.SO_BACKLOG, 128)          // (7)
     .childOption(ChannelOption.SO_KEEPALIVE, true); // (8)
    
     // Bind and start to accept incoming connections.
     ChannelFuture f = b.bind(port).sync(); // (9)
    
     // Wait until the server socket is closed.
     // In this example, this does not happen, but you can do that to gracefully
     // shut down your server.
     f.channel().closeFuture().sync();
} finally {
    workerGroup.shutdownGracefully();
    bossGroup.shutdownGracefully();
}

上面这段代码展示了服务端的一个基本步骤:

1、 初始化用于Acceptor的主"线程池"以及用于I/O工作的从"线程池";
2、 初始化ServerBootstrap实例, 此实例是netty服务端应用开发的入口,也是本篇介绍的重点, 下面我们会深入分析;
3、 通过ServerBootstrap的group方法,设置(1)中初始化的主从"线程池";
4、 指定通道channel的类型,由于是服务端,故而是NioServerSocketChannel;
5、 设置ServerSocketChannel的处理器(此处不详述,后面的系列会进行深入分析)
6、 设置子通道也就是SocketChannel的处理器, 其内部是实际业务开发的"主战场"(此处不详述,后面的系列会进行深入分析)
7、 配置ServerSocketChannel的选项
8、 配置子通道也就是SocketChannel的选项
9、 绑定并侦听某个端口

接着,我们再看看客户端是如何开发的:

Netty客户端示例:

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); // (1)
        
        try {
            Bootstrap b = new Bootstrap(); // (2)
            b.group(workerGroup); // (3)
            b.channel(NioSocketChannel.class); // (4)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (5)
            b.handler(new ChannelInitializer() { // (6)
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (7)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

客户端的开发步骤和服务端都差不多:

1、 初始化用于连接及I/O工作的"线程池";
2、 初始化Bootstrap实例, 此实例是netty客户端应用开发的入口,也是本篇介绍的重点, 下面我们会深入分析;
3、 通过Bootstrap的group方法,设置(1)中初始化的"线程池";
4、 指定通道channel的类型,由于是客户端,故而是NioSocketChannel;
5、 设置SocketChannel的选项(此处不详述,后面的系列会进行深入分析);
6、 设置SocketChannel的处理器, 其内部是实际业务开发的"主战场"(此处不详述,后面的系列会进行深入分析);
7、 连接指定的服务地址;

通过对上面服务端及客户端代码分析,Bootstrap是Netty应用开发的入口,如果想要理解Netty内部的实现细节,那么有必要先了解一下Bootstrap内部的实现机制。

首先我们先看一下ServerBootstrap及Bootstrap的类继承结构图:

通过类图我们知道AbstractBootstrap类是ServerBootstrap及Bootstrap的基类,我们先看一下AbstractBootstrap类的主要代码:

public abstract class AbstractBootstrap, C extends Channel> implements Cloneable {

    volatile EventLoopGroup group;
    private volatile ChannelFactory channelFactory;
    private final Map, Object> options = new LinkedHashMap, Object>();
    private final Map, Object> attrs = new LinkedHashMap, Object>();
    private volatile ChannelHandler handler;

    
    public B group(EventLoopGroup group) {
        if (group == null) {
            throw new NullPointerException("group");
        }
        if (this.group != null) {
            throw new IllegalStateException("group set already");
        }
        this.group = group;
        return self();
    }

    private B self() {
        return (B) this;
    }

    public B channel(Class channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory(channelClass));
    }

    @Deprecated
    public B channelFactory(ChannelFactory channelFactory) {
        if (channelFactory == null) {
            throw new NullPointerException("channelFactory");
        }
        if (this.channelFactory != null) {
            throw new IllegalStateException("channelFactory set already");
        }

        this.channelFactory = channelFactory;
        return self();
    }

    public B channelFactory(io.netty.channel.ChannelFactory channelFactory) {
        return channelFactory((ChannelFactory) channelFactory);
    }

    public  B option(ChannelOption option, T value) {
        if (option == null) {
            throw new NullPointerException("option");
        }
        if (value == null) {
            synchronized (options) {
                options.remove(option);
            }
        } else {
            synchronized (options) {
                options.put(option, value);
            }
        }
        return self();
    }

    public  B attr(AttributeKey key, T value) {
        if (key == null) {
            throw new NullPointerException("key");
        }
        if (value == null) {
            synchronized (attrs) {
                attrs.remove(key);
            }
        } else {
            synchronized (attrs) {
                attrs.put(key, value);
            }
        }
        return self();
    }

    public B validate() {
        if (group == null) {
            throw new IllegalStateException("group not set");
        }
        if (channelFactory == null) {
            throw new IllegalStateException("channel or channelFactory not set");
        }
        return self();
    }

    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }

    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }

    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it"s not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        return regFuture;
    }

    abstract void init(Channel channel) throws Exception;

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

    public B handler(ChannelHandler handler) {
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
        return self();
    }public abstract AbstractBootstrapConfig config();

}

现在我们以示例代码为出发点,来详细分析一下引导类内部实现细节:

1、 首先看看服务端的b.group(bossGroup, workerGroup):

调用ServerBootstrap的group方法,设置react模式的主线程池 以及 IO 操作线程池,ServerBootstrap中的group代码如下:

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = childGroup;
        return this;
    }

在group方法中,会继续调用父类的group方法,而通过类继承图我们知道,super.group(parentGroup)其实调用的就是AbstractBootstrap的group方法。AbstractBootstrap中group代码如下:

public B group(EventLoopGroup group) {
        if (group == null) {
            throw new NullPointerException("group");
        }
        if (this.group != null) {
            throw new IllegalStateException("group set already");
        }
        this.group = group;
        return self();
    }

通过以上分析,我们知道了AbstractBootstrap中定义了主线程池group的引用,而子线程池childGroup的引用是定义在ServerBootstrap中。

当我们查看客户端Bootstrap的group方法时,我们发现,其是直接调用的父类AbstractBoostrap的group方法。

2、示例代码中的 channel()方法

无论是服务端还是客户端,channel调用的都是基类的channel方法,其实现细节如下:

public B channel(Class channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory(channelClass));
}
public B channelFactory(ChannelFactory channelFactory) {
    if (channelFactory == null) {
        throw new NullPointerException("channelFactory");
    }
    if (this.channelFactory != null) {
        throw new IllegalStateException("channelFactory set already");
    }

    this.channelFactory = channelFactory;
    return self();
}

我们发现,其实channel方法内部,只是初始化了一个用于生产指定channel类型的工厂实例。

3、option / handler / attr 方法

option: 设置通道的选项参数, 对于服务端而言就是ServerSocketChannel, 客户端而言就是SocketChannel;

  handler: 设置主通道的处理器, 对于服务端而言就是ServerSocketChannel,也就是用来处理Acceptor的操作;

      对于客户端的SocketChannel,主要是用来处理 业务操作;

attr: 设置通道的属性;

 option / handler / attr方法都定义在AbstractBootstrap中, 所以服务端和客户端的引导类方法调用都是调用的父类的对应方法。

4、childHandler / childOption / childAttr 方法(只有服务端ServerBootstrap才有child类型的方法)

  对于服务端而言,有两种通道需要处理, 一种是ServerSocketChannel:用于处理用户连接的accept操作, 另一种是SocketChannel,表示对应客户端连接。而对于客户端,一般都只有一种channel,也就是SocketChannel。

  因此以child开头的方法,都定义在ServerBootstrap中,表示处理或配置服务端接收到的对应客户端连接的SocketChannel通道。

  childHandler / childOption / childAttr 在ServerBootstrap中的对应代码如下:

public ServerBootstrap childHandler(ChannelHandler childHandler) {
    if (childHandler == null) {
        throw new NullPointerException("childHandler");
    }
    this.childHandler = childHandler;
    return this;
}
public  ServerBootstrap childOption(ChannelOption childOption, T value) {
    if (childOption == null) {
        throw new NullPointerException("childOption");
    }
    if (value == null) {
        synchronized (childOptions) {
            childOptions.remove(childOption);
        }
    } else {
        synchronized (childOptions) {
            childOptions.put(childOption, value);
        }
    }
    return this;
}
public  ServerBootstrap childAttr(AttributeKey childKey, T value) {
    if (childKey == null) {
        throw new NullPointerException("childKey");
    }
    if (value == null) {
        childAttrs.remove(childKey);
    } else {
        childAttrs.put(childKey, value);
    }
    return this;
}

至此,引导类的属性配置都设置完毕了。

本篇总结:

1、服务端由两种线程池,用于Acceptor的React主线程和用于I/O操作的React从线程池; 客户端只有用于连接及IO操作的React的主线程池;

2、ServerBootstrap中定义了服务端React的"从线程池"对应的相关配置,都是以child开头的属性。 而用于"主线程池"channel的属性都定义在AbstractBootstrap中;

本篇只是简单介绍了一下引导类的配置属性, 下一篇我将详细介绍服务端引导类的Bind过程分析。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68110.html

相关文章

  • Netty4.x 源码实战系列(二):服务端bind流程详解

    摘要:对于,目前大家只知道是个线程组,其内部到底如何实现的,它的作用到底是什么,大家也都不太清楚,由于篇幅原因,这里不作详细介绍,后面会有文章作专门详解。 在上一篇《ServerBootstrap 与 Bootstrap 初探》中,我们已经初步的了解了ServerBootstrap是netty进行服务端开发的引导类。 且在上一篇的服务端示例中,我们也看到了,在使用netty进行网络编程时,我...

    laoLiueizo 评论0 收藏0
  • Netty4.x 源码实战系列(五):深入浅出学NioEventLoopGroup

    摘要:接下来的两篇文章,我将从源码角度为大家深入浅出的剖析的线程模型工作机制。我们看一下的源码通过的代码发现,实现了接口,其内部会通过指定的默认线程工厂来创建线程,并执行相应的任务。至此,初始化完成了。下一篇我们将详细介绍,敬请期待。 我们都知道Netty的线程模型是基于React的线程模型,并且我们都知道Netty是一个高性能的NIO框架,那么其线程模型必定是它的重要贡献之一。 在使用ne...

    MSchumi 评论0 收藏0
  • 【自己读源码Netty4.X系列() 启动类概览

    摘要:一些想法这个系列想开很久了,自己使用也有一段时间了,利用也编写了一个简单的框架,并运用到工作中了,感觉还不错,趁着这段时间工作不是很忙,来分析一波源码,提升下技术硬实力。 一些想法 这个系列想开很久了,自己使用netty也有一段时间了,利用netty也编写了一个简单的框架,并运用到工作中了,感觉还不错,趁着这段时间工作不是很忙,来分析一波源码,提升下技术硬实力。 结构 这里先看下net...

    qingshanli1988 评论0 收藏0
  • Netty4.x 源码实战系列(三):NioServerSocketChannel全剖析

    摘要:本篇将通过实例化过程,来深入剖析。及初始化完成后,它们会相互连接。我们在回到的构造方法父类构造方法调用完成后,还要初始化一下自己的配置对象是的内部类而又是继承自,通过代码分析,此对象就是就会对底层一些配置设置行为的封装。 根据上一篇《Netty4.x 源码实战系列(二):服务端bind流程详解》所述,在进行服务端开发时,必须通过ServerBootstrap引导类的channel方法来...

    Flink_China 评论0 收藏0
  • 【自己读源码Netty4.X系列(二) 启动类成员Channel

    摘要:下面无耻的贴点源码。启动类我们也学,把启动类抽象成两层,方便以后写客户端。别着急,我们慢慢来,下一篇我们会了解以及他的成员,然后,完善我们的程序,增加其接收数据的能力。文章的源码我会同步更新到我的上,欢迎大家,哈哈。 废话两句 这次更新拖了很长时间,第一是自己生病了,第二是因为最开始这篇想写的很大,然后构思了很久,发现不太合适把很多东西写在一起,所以做了点拆分,准备国庆前完成这篇博客。...

    waterc 评论0 收藏0

发表评论

0条评论

BakerJ

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<