资讯专栏INFORMATION COLUMN

Jetty : Embedded Server 启动流程 - 1

everfly / 2639人阅读

前言

本文基于 Jetty 8.1.x 版本简单介绍 Jetty Embedded Server 核心概念,线程模型,启动流程。以下代码片段摘自 Jetty 源代码 中的 example-jetty-embedded 模块的 OneServletContext.java

public class OneServletContext {
    public static void main(String[] args) throws Exception {
        Server server = new Server(8080);

        ServletContextHandler context = new ServletContextHandler(
        ServletContextHandler.SESSIONS);
        context.setContextPath("/");
        server.setHandler(context);

        ...
        
        // Serve some hello world servlets
        context.addServlet(new ServletHolder(new HelloServlet()),"/*");
        context.addServlet(new ServletHolder(
                new HelloServlet("Buongiorno Mondo")),"/it/*");
        context.addServlet(new ServletHolder(
                new HelloServlet("Bonjour le Monde")),"/fr/*");

        server.start();
        server.join();
    }
}

Embedded Jetty 使用起来很方便,少数几行代码就可以实现一个 http (Servlet)接口

创建 Server 对象,设置要监听的端口 8080

创建 ServletContextHandler 对象,ServletContextHandler "is-a" Handler(Request Handler),提供 ServletContext

将 ServletContextHandler 关联到 Server(即 server.setHandler(context))

向 ServletContextHandler 中添加 Servlet,以 ServletHolder 的形式

启动 Server

核心概念 LifeCycle

The lifecycle(生命周期) interface for generic components,声明 start(启动),stop(停止),isRunning,isStarted(查询状态)等方法

AbstractLifeCycle

实现 LifeCycle 接口的抽象类,提供生命周期管理默认实现:例如通过 _state 属性保存组件状态,提供 start,stop 默认实现

public final void start() throws Exception {
    // 线程安全
    synchronized (_lock) {
        try {
            // 状态保护
            if (_state == _STARTED || _state == _STARTING) {
                return;
            }
            setStarting();
            // 将具体的 start 逻辑推迟到具体的子类实现
            doStart();
            setStarted();
        } catch (...) {
            ...
        }
    }
}
AggregateLifeCycle

An AggregateLifeCycle is an LifeCycle implementation for a collection of contained beans
AggregateLifeCycle 继承自 AbstractLifeCycle,管理一组 Bean 的生命周期

public class AggregateLifeCycle extends AbstractLifeCycle implements
    Destroyable, Dumpable {
    private final List _beans = new CopyOnWriteArrayList<>();

    private boolean _started = false;

    @Override
    protected void doStart() throws Exception {
        for (Bean b: beans) {
            if (b._managed && b._bean instanceof LifeCycle) {
                LifeCycle l = (LefeCycle) b._bean;
                if (!l.isRunning()) {
                    // start managed bean
                    l.start();
                }
            }
        }
        _started =true;
        super.doStart();
    }
}
Server

类层次

AbstractLifeCycle
    AggregateLifeCycle
        AbstractHandler
            AbstractHandlerContainer
                HandlerWrapper
                    Server

Server 类是 Jetty 的《门面》类,包含:

Connector,接收客户端请求

Handler,处理客户端请求

ThreadPool,任务调度线程池

通过 Server 类提供的各种 set 属性方法可以定制 Connector, ThreadPool,如果没有特殊指定 Server 类会创建默认实现,比如默认的 Connector 为 SelectChannelConnector,默认的 ThreadPool 为 QueuedThreadPool

Connector

(主要)类层次:

Connector
    AbstractConnector
        AbstractNIOConnector
            SelectChannelConnector

Connector 接口及其实现类用于 接收客户端请求,(HTTP/SPDY)协议解析,最终调用 Server 中的(Request)Handler 处理请求,SelectChannelConnector 是 Server 默认使用的 Connector

public Server(int port) {
    setServer(this);

    Connector connector = new SelectChannelConnector();
    connector.setPort(port);
    setConnectors(new Connector[]{ connector });
}
Handler

Handler 或者叫作 Request Handler,用于处理客户端请求

public interface Handler extends LifeCycle, Destroyable {
    ...
    void handle(String target, Request baseRequest, HttpServletRequest request,
            HttpServletResponse response);
    ...
}

handle 方法的签名已经很接近 Servlet service 方法,这里的 target 通常是请求的 uri,用于根据 Servlet 配置规则找到具体的 Servlet 并调用其 service 方法

线程模型

Jetty Server 使用线程池来处理客户端请求,ThreadPool 接口定义了线程池基本操作,Server 默认使用 QueuedThreadPool 类,如果需要,可以通过 Server.setThreadPool 方法手动设置线程池
注:Jetty 9.x 线程模型比较复杂,Jetty 8.x 比较简单,代码好读一些~

QueuedThreadPool

QueuedThreadPool 类层次结构

ThreadPool
    SizedThreadPool
        QueuedThreadPool

线程池核心问题:

线程管理:包括创建,销毁等

任务分派:推或拉模型

线程管理

通过最小线程个数,最大线程个数,线程最大空闲时间 来动态创建,销毁线程

线程池初始化
// QueuedThreadPool.java
public class QueuedThreadPool {
    // 默认最大线程数
    private int _maxThreads = 254;
    // 默认最小线程数
    private int _minThreads = 8;
    
    ...

    @Override
    protected void doStart() throws Exception {
        ...
        int threads = _threadsStarted.get();
        // 启动 _minThreads 个线程
        while (isRunning() && threads < _minThreads) {
            startThread(threads);
            threads = _threadStarted.get();
        }
    }
}
线程销毁

当线程空闲时间超过设定的阈值 _maxIdleTimeMs 而且线程池的线程个数高于 _minThreads 时,线程将从 Runnable run 方法中退出

# QueuedThreadPool.java
private Runnable _runnable = new Runnable() {
    public void run() {
        boolean shrink = false;
        ...
        try {
            Runnable job = _jobs.poll()
            while (isRunning()) {
                // job loop:从队列中拉取 job 并执行直到队列为空
                while (job != null && isRunning()) {
                    runJob(job);
                    job = _jobs.poll();
                }

                // idle loop
                try {
                    _threadsIdle.incrementAndGet();
                    while (isRunning() && job == null) {
                        // 如果 _maxIdleTimeMs < 0 就进入阻塞模式,直到成功拉取到 job
                        if (_maxIdleTimeMs <= 0) {
                            job = _jobs.take();
                        } else {
                            final int size = _threadsStarted.get();
                            // 如果线程个数少于 _minThreads 那么不需要回收线程
                            if (size > _minThreads) {
                                long last = _lastShrink.get();
                                long now = System.currentTimeMillis();
                                if (last == 0 || (now - last) > _maxIdleTimeMs) {
                                    shrink = _lastShrink.compareAndSet(last, now) &&
                                        _threadsStarted.compareAndSet(size, size - 1);
                                    if (shringk) {
                                        // 退出 while 循环,即终止线程
                                        return;
                                    }
                                }
                            }
                            // 带 timeout 的 poll
                            job = idleJobPoll();
                        }
                    }
                }
            }
        } catch (...) {
        } finally {
        }
    }
}
任务分派

使用阻塞队列,工作线程从阻塞队列中主动拉取(poll)任务(job),QueuedThreadPool 默认使用 ArrayBlockingQueue,如果需要可以通过构造方法手动指定阻塞队列的具体实现

public class QueuedThreadPool {
    public QueuedThreadPool(BlockingQueue jobQ) {
        this();
        _jobs = jobQ;
        _jobs.clear();
    }

    public boolean dispatch(Runnable job) {
        if (isRunning()) {
            final int jobQ = _jobs.size();
            final int idle = getIdleThreads();
            // 将 job 放入队列中
            if (_jobs.offer(job)) {
                // 如果当前没有 idle 的线程,或者 队列中堆积的 job 比 idle 线程多那么尝试启动新的线程
                if (idle == 0 || jobQ > idle) {
                    int threads = _threadsStarted.get();
                    if (threads < _maxThreads) {
                        startThread(threads);
                    }
                }
                return true;
            }
        }
        return false;
    }
}
启动流程

Server 的启动开始于 start 方法的调用,如上文所述,Server 类继承自 AggregateServer,父类的 start 方法最终调用 Server 的 doStart 方法

Server start
@Override
protected void doStart() throws Exception {
    ...
    // 创建默认的 ThreadPool
    if (_threadPool == null) {
        setThreadPool(new QueuedThreadPool())
    }
    try {
        super.doStart();
    } catch(Throwable e) {...}
    // 启动 Connector,接收,处理请求
    if (_connectors != null && mex.size == 0) {
        for (int i = 0; i < _connectors.length; i++) {
            try {
                _connectors[i].start();
            } catch (Throwable e) {
                mex.add(e);
            }
        }
    }
    ...
}

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/71172.html

相关文章

  • Jetty : Embedded Server 启动流程 - 2

    摘要:前言本文接着启动流程往下讲,上回说到调用方法,开始接收和处理请求,默认使用的子类,所以我们重点来看看的具体实现类层次,我们上文提到过,这里有两个新面孔抽象实现类上文讲过方法最终会调用方法,使用模板方法模式在方法中实现了的基本流程子类方法, 前言 本文接着 Jetty : Embedded Server 启动流程 - 1往下讲,上回说到 Server.start 调用 Connector...

    phpmatt 评论0 收藏0
  • Spring MVC实现Spring Security,Spring Stomp websocket

    摘要:使用框架各个组件实现一个在线聊天网页,当有用户连接,服务器监听到用户连接会使用推送最新用户列表,有用户断开刷新在线列表,实时推送用户聊天信息。根据请求头是否等于判断是否是。 使用Spring框架各个组件实现一个在线聊天网页,当有用户连接WebSocket,服务器监听到用户连接会使用Stomp推送最新用户列表,有用户断开刷新在线列表,实时推送用户聊天信息。引入Jetty服务器,直接嵌入整...

    shuibo 评论0 收藏0
  • 《Spring Boot 编程思想 - 核心篇》勘误汇总

    摘要:如果您在阅读编程思想核心篇或示例练习的过程中发现了其中错误或提出建议,请将内容提交至勘误汇,小马哥将勘误或建议内容汇总到此,修正后的内容将在后续的书籍发行中体现,并刊登勘误贡献者。笔者水平有限,行文的过程中错误无法避免,为此深表歉意。 如果您在阅读《Spring Boot 编程思想 - 核心篇》或示例练习的过程中发现了其中错误或提出建议,请将内容提交至【勘误汇】,小马哥将勘误或建议内容...

    trilever 评论0 收藏0
  • Spring Boot 2 快速教程:WebFlux 快速入门(二)

    摘要:响应式编程是基于异步和事件驱动的非阻塞程序,只是垂直通过在内启动少量线程扩展,而不是水平通过集群扩展。三特性常用的生产的特性如下响应式编程模型适用性内嵌容器组件还有对日志消息测试及扩展等支持。 摘要: 原创出处 https://www.bysocket.com 「公众号:泥瓦匠BYSocket 」欢迎关注和转载,保留摘要,谢谢! 02:WebFlux 快速入门实践 文章工程: JDK...

    gaara 评论0 收藏0
  • 分布式 - Jetty架构

    摘要:实现为,即模式上一个依赖于下一个的调用,比如常见的就是这种模式。启动实例化和设置运行时处理流程通常会实例化一个,注意的构造方法最终会调用依次将构成责任链因为这个连同都是类型。内部所有等执行即。 showImg(https://segmentfault.com/img/bV3OIg?w=326&h=314); showImg(https://segmentfault.com/img/bV...

    libin19890520 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<