资讯专栏INFORMATION COLUMN

Http请求连接池-HttpClient的AbstractConnPool源码分析

gself / 3195人阅读

摘要:若使用连接池的方式,来管理连接对象,能极大地提高服务的吞吐量。另外每个对应一个连接池,实现了在级别的隔离,若下游的某台提供服务的主机挂了,无效的连接最多只占用该对应的连接池,不会占用整个连接池,从而拖垮整个服务。

背景

在做服务化拆分的时候,若不是性能要求特别高的场景,我们一般对外暴露Http服务。Spring里提供了一个模板类RestTemplate,通过配置RestTemplate,我们可以快速地访问外部的Http服务。Http底层是通过Tcp的三次握手建立连接的,若每个请求都要重新建立连接,那开销是很大的,特别是对于消息体非常小的场景,开销更大。

若使用连接池的方式,来管理连接对象,能极大地提高服务的吞吐量。

RestTemplate底层是封装了HttpClient(笔者的版本是4.3.6),它提供了连接池机制来处理高并发网络请求。

示例

通常,我们采用如下的样板代码来构建HttpClient:

</>复制代码

  1. HttpClientBuilder builder = HttpClientBuilder.create();
  2. builder.setMaxConnTotal(maxConnections).setMaxConnPerRoute(maxConnectionsPerRoute);
  3. if (!connectionReuse) {
  4. builder.setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE);
  5. }
  6. if (!automaticRetry) {
  7. builder.disableAutomaticRetries();
  8. }
  9. if (!compress) {
  10. builder.disableContentCompression();
  11. }
  12. HttpClient httpClient = builder.build();

从上面的代码可以看出,HttpClient使用建造者设计模式来构造对象,最后一行代码构建对象,前面的代码是用来设置客户端的最大连接数、单路由最大连接数、是否使用长连接、压缩等特性。

源码分析

我们进入HttpClientBuilder的build()方法,会看到如下代码:

</>复制代码

  1. # 构造Http连接池管理器
  2. final PoolingHttpClientConnectionManager poolingmgr = new PoolingHttpClientConnectionManager(
  3. RegistryBuilder.create()
  4. .register("http", PlainConnectionSocketFactory.getSocketFactory())
  5. .register("https", sslSocketFactory)
  6. .build());
  7. if (defaultSocketConfig != null) {
  8. poolingmgr.setDefaultSocketConfig(defaultSocketConfig);
  9. }
  10. if (defaultConnectionConfig != null) {
  11. poolingmgr.setDefaultConnectionConfig(defaultConnectionConfig);
  12. }
  13. if (systemProperties) {
  14. String s = System.getProperty("http.keepAlive", "true");
  15. if ("true".equalsIgnoreCase(s)) {
  16. s = System.getProperty("http.maxConnections", "5");
  17. final int max = Integer.parseInt(s);
  18. poolingmgr.setDefaultMaxPerRoute(max);
  19. poolingmgr.setMaxTotal(2 * max);
  20. }
  21. }
  22. if (maxConnTotal > 0) {
  23. poolingmgr.setMaxTotal(maxConnTotal);
  24. }
  25. if (maxConnPerRoute > 0) {
  26. poolingmgr.setDefaultMaxPerRoute(maxConnPerRoute);
  27. }
  28. # Http连接管理器采用连接池的方式实现
  29. connManager = poolingmgr;

默认情况下构造出的Http连接管理器是采用连接池的方式实现的。

我们进入 PoolingHttpClientConnectionManager的代码,其连接池的核心实现是依赖于 CPool类,而 CPool又继承了抽象类AbstractConnPool AbstractConnPool@ThreadSafe的注解,说明它是线程安全类,所以 HttpClient线程安全地获取、释放连接都依赖于 AbstractConnPool

接下来我来看最核心的AbstractConnPool类,以下是连接池的结构图:

连接池最重要的两个公有方法是 leaserelease,即获取连接和释放连接的两个方法。

lease 获取连接

</>复制代码

  1. @Override
  2. public Future lease(final T route, final Object state, final FutureCallback callback) {
  3. Args.notNull(route, "Route");
  4. Asserts.check(!this.isShutDown, "Connection pool shut down");
  5. return new PoolEntryFuture(this.lock, callback) {
  6. @Override
  7. public E getPoolEntry(
  8. final long timeout,
  9. final TimeUnit tunit)
  10. throws InterruptedException, TimeoutException, IOException {
  11. final E entry = getPoolEntryBlocking(route, state, timeout, tunit, this);
  12. onLease(entry);
  13. return entry;
  14. }
  15. };
  16. }

lease方法返回的是一个 Future对象,即需要调用 Futureget方法,才可以得到PoolEntry的对象,它包含了一个连接的具体信息。

而获取连接是通过 getPoolEntryBlocking方法实现的,通过函数名可以知道,这是一个阻塞的方法,即该route所对应的连接池中的连接不够用时,该方法就会阻塞,直到该 route所对应的连接池有连接释放,方法才会被唤醒;或者方法一直等待,直到连接超时抛出异常。

</>复制代码

  1. private E getPoolEntryBlocking(
  2. final T route, final Object state,
  3. final long timeout, final TimeUnit tunit,
  4. final PoolEntryFuture future)
  5. throws IOException, InterruptedException, TimeoutException {
  6. Date deadline = null;
  7. // 设置连接超时时间戳
  8. if (timeout > 0) {
  9. deadline = new Date
  10. (System.currentTimeMillis() + tunit.toMillis(timeout));
  11. }
  12. // 获取连接,并修改修改连接池,所以加锁--->线程安全
  13. this.lock.lock();
  14. try {
  15. // 从Map中获取该route对应的连接池,若Map中没有,则创建该route对应的连接池
  16. final RouteSpecificPool pool = getPool(route);
  17. E entry = null;
  18. while (entry == null) {
  19. Asserts.check(!this.isShutDown, "Connection pool shut down");
  20. for (;;) {
  21. // 获取 同一状态的 空闲连接,即从available链表的头部中移除,添加到leased集合中
  22. entry = pool.getFree(state);
  23. // 若返回连接为空,跳出循环
  24. if (entry == null) {
  25. break;
  26. }
  27. // 若连接已过期,则关闭连接
  28. if (entry.isExpired(System.currentTimeMillis())) {
  29. entry.close();
  30. } else if (this.validateAfterInactivity > 0) {
  31. if (entry.getUpdated() + this.validateAfterInactivity <= System.currentTimeMillis()) {
  32. if (!validate(entry)) {
  33. entry.close();
  34. }
  35. }
  36. }
  37. if (entry.isClosed()) {
  38. // 若该连接已关闭,则总的available链表中删除该连接
  39. this.available.remove(entry);
  40. // 从该route对应的连接池的leased集合中删除该连接,并且不回收到available链表中
  41. pool.free(entry, false);
  42. } else {
  43. break;
  44. }
  45. }
  46. // 跳出for循环
  47. if (entry != null) {
  48. // 若获取的连接不为空,将连接从总的available链表移除,并添加到leased集合中
  49. // 获取连接成功,直接返回
  50. this.available.remove(entry);
  51. this.leased.add(entry);
  52. onReuse(entry);
  53. return entry;
  54. }
  55. // 计算该route的最大连接数
  56. // New connection is needed
  57. final int maxPerRoute = getMax(route);
  58. // Shrink the pool prior to allocating a new connection
  59. // 计算该route连接池中的连接数 是否 大于等于 route最大连接数
  60. final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
  61. // 若大于等于 route最大连接数,则收缩该route的连接池
  62. if (excess > 0) {
  63. for (int i = 0; i < excess; i++) {
  64. // 获取该route连接池中最不常用的空闲连接,即available链表末尾的连接
  65. // 因为回收连接时,总是将连接添加到available链表的头部,所以链表尾部的连接是最有可能过期的
  66. final E lastUsed = pool.getLastUsed();
  67. if (lastUsed == null) {
  68. break;
  69. }
  70. // 关闭连接,并从总的空闲链表以及route对应的连接池中删除
  71. lastUsed.close();
  72. this.available.remove(lastUsed);
  73. pool.remove(lastUsed);
  74. }
  75. }
  76. // 该route的连接池大小 小于 route最大连接数
  77. if (pool.getAllocatedCount() < maxPerRoute) {
  78. final int totalUsed = this.leased.size();
  79. final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
  80. if (freeCapacity > 0) {
  81. final int totalAvailable = this.available.size();
  82. // 总的空闲连接数 大于等于 总的连接池剩余容量
  83. if (totalAvailable > freeCapacity - 1) {
  84. if (!this.available.isEmpty()) {
  85. // 从总的available链表中 以及 route对应的连接池中 删除连接,并关闭连接
  86. final E lastUsed = this.available.removeLast();
  87. lastUsed.close();
  88. final RouteSpecificPool otherpool = getPool(lastUsed.getRoute());
  89. otherpool.remove(lastUsed);
  90. }
  91. }
  92. // 创建新连接,并添加到总的leased集合以及route连接池的leased集合中,函数返回
  93. final C conn = this.connFactory.create(route);
  94. entry = pool.add(conn);
  95. this.leased.add(entry);
  96. return entry;
  97. }
  98. }
  99. //route的连接池已满,无法分配连接
  100. boolean success = false;
  101. try {
  102. // 将该获取连接的任务放入pending队列
  103. pool.queue(future);
  104. this.pending.add(future);
  105. // 阻塞等待,若在超时之前被唤醒,则返回true;若直到超时才返回,则返回false
  106. success = future.await(deadline);
  107. } finally {
  108. // In case of "success", we were woken up by the
  109. // connection pool and should now have a connection
  110. // waiting for us, or else we"re shutting down.
  111. // Just continue in the loop, both cases are checked.
  112. // 无论是 被唤醒返回、超时返回 还是被 中断异常返回,都会进入finally代码段
  113. // 从pending队列中移除
  114. pool.unqueue(future);
  115. this.pending.remove(future);
  116. }
  117. // check for spurious wakeup vs. timeout
  118. // 判断是伪唤醒 还是 连接超时
  119. // 若是 连接超时,则跳出while循环,并抛出 连接超时的异常;
  120. // 若是 伪唤醒,则继续循环获取连接
  121. if (!success && (deadline != null) &&
  122. (deadline.getTime() <= System.currentTimeMillis())) {
  123. break;
  124. }
  125. }
  126. throw new TimeoutException("Timeout waiting for connection");
  127. } finally {
  128. // 释放锁
  129. this.lock.unlock();
  130. }
  131. }
release 释放连接

</>复制代码

  1. @Override
  2. public void release(final E entry, final boolean reusable) {
  3. // 获取锁
  4. this.lock.lock();
  5. try {
  6. // 从总的leased集合中移除连接
  7. if (this.leased.remove(entry)) {
  8. final RouteSpecificPool pool = getPool(entry.getRoute());
  9. // 回收连接
  10. pool.free(entry, reusable);
  11. if (reusable && !this.isShutDown) {
  12. this.available.addFirst(entry);
  13. onRelease(entry);
  14. } else {
  15. entry.close();
  16. }
  17. // 获取pending队列队头的任务(先进先出原则),唤醒该阻塞的任务
  18. PoolEntryFuture future = pool.nextPending();
  19. if (future != null) {
  20. this.pending.remove(future);
  21. } else {
  22. future = this.pending.poll();
  23. }
  24. if (future != null) {
  25. future.wakeup();
  26. }
  27. }
  28. } finally {
  29. // 释放锁
  30. this.lock.unlock();
  31. }
  32. }
总结

AbstractConnPool其实就是通过在获取连接、释放连接时加锁,来实现线程安全,思路非常简单,但它没有在route对应的连接池中加锁对象,即 RouteSpecificPool的获取连接、释放连接操作是不加锁的,因为已经在 AbstractConnPool的外部调用中加锁,所以是线程安全的,简化了设计。

另外每个route对应一个连接池,实现了在host级别的隔离,若下游的某台提供服务的主机挂了,无效的连接最多只占用该route对应的连接池,不会占用整个连接池,从而拖垮整个服务。

以上。

原文链接

https://segmentfault.com/a/11...

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68005.html

相关文章

  • 记JVM堆外内存泄漏Bug查找

    摘要:服务本身是一个,开起的线程数为,再加上一些其他线程,总的线程数不会超过服务内自己没有显示创建线程或者使用线程池。问题解决找到所在后,结局方案很简单,只需将的通过单例的方式注入到服务中,即可解决堆外内存泄漏的问题。 内存泄漏Bug现场 一个做BI数据展示的服务在一个晚上重启了5次,由于是通过k8s容器编排,服务挂了以后会自动重启,所以服务还能继续提供服务。 第一时间先上日志系统查看错误日...

    hiYoHoo 评论0 收藏0
  • Apache HttpClient源码分析连接

    摘要:对连接数的管理则有两个维度,分别是全局最大数和单最大数。当请求一个连接时,会返回。而会维护与及存活时间等。最终用户得到的是里封装而成的连接对象。连接数达到阈值时对请求进行堵塞,并且将放入。 showImg(https://segmentfault.com/img/bVZW77?w=1217&h=886); 上图时连接池类图关系。PoolingHttpConnectionManager:...

    YFan 评论0 收藏0
  • Android网络编程2HttpUrlConnection和HttpClient

    摘要:压缩和缓存机制可以有效地减少网络访问的流量,在提升速度和省电方面也起到了较大的作用。打开来分析一下,不了解和协议原理的请查看网络编程一协议原理这篇文章。当然这次错误是正常的,百度没理由处理我们的这次请求。 前言 上一篇我们了解了HTTP协议原理,这一篇我们来讲讲Apache的HttpClient和Java的HttpURLConnection,这两种都是我们平常请求网络会用到的。无论我们...

    cfanr 评论0 收藏0
  • 浅析 jdk11 中 HttpClient 使用

    摘要:在中也可以直接使用返回的是,然后通过来获取结果阻塞线程,从中获取结果四一点唠叨非常的年轻,网络资料不多,且代码非常精细和复杂,目前来看底层应该是使用了线程池搭配进行异步通讯。 零 前期准备 0 版本 JDK 版本 : OpenJDK 11.0.1 IDE : idea 2018.3 1 HttpClient 简介 java.net.http.HttpClient 是 jdk11 中正式...

    Eminjannn 评论0 收藏0

发表评论

0条评论

gself

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<