「DUBBO系列」線程池策略實現原理與源碼分析

2020-09-05 科技萬物互聯

請點擊【關注】獲取更多網際網路和技術乾貨,頭條號IT徐胖子原創本文請勿轉載,感謝支持

1 文章概述

本系列文章已經分析了DUBBO線程模型實現原理,本文簡單進行回顧。我們知道DUBBO提供五種線程模型

all所有消息都派發到業務線程池,包括請求,響應,連接事件,斷開事件,心跳direct所有消息都不派發到業務線程池,全部在IO線程直接執行message只有請求響應消息派發到業務線程池,其它連接斷開事件,心跳等消息直接在IO線程執行execution只有請求消息派發到業務線程池,響應和其它連接斷開事件,心跳等消息直接在IO線程執行connection在IO線程上將連接斷開事件放入隊列,有序逐個執行,其它消息派發到業務線程池

不同線程模型會選擇使用IO線程還是業務線程,那麼業務線程池採用什麼線程池策略是本文需要回答的問題。DUBBO提供了多種線程池策略,選擇線程池策略需要在配置文件指定threadpool屬性

<dubbo:protocol name=&34; threadpool=&34; threads=&34; /><dubbo:protocol name=&34; threadpool=&34; threads=&34; /><dubbo:protocol name=&34; threadpool=&34; threads=&34; /><dubbo:protocol name=&34; threadpool=&34; threads=&34; />

不同線程池策略會創建不同線程池,不同線程池處理任務方式也不相同

fixed固定線程數量cached線程空閒一分鐘會被回收,當新請求到來時會創建新線程limited線程個數隨著任務增加而增加,但不會超過最大閾值。空閒線程不會被回收eager當所有核心線程數都處於忙碌狀態時,優先創建新線程執行任務

2 線程池策略確認時機

public class AllDispatcher implements Dispatcher { public static final String NAME = &34;; @Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { return new AllChannelHandler(handler, url); }}public class AllChannelHandler extends WrappedChannelHandler { public AllChannelHandler(ChannelHandler handler, URL url) { super(handler, url); }}

分析WrappedChannelHandler構造函數

public class WrappedChannelHandler implements ChannelHandlerDelegate { public WrappedChannelHandler(ChannelHandler handler, URL url) { this.handler = handler; this.url = url; // 獲取線程池自適應擴展點默認FixedThreadPool executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY; if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) { componentKey = Constants.CONSUMER_SIDE; } DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); dataStore.put(componentKey, Integer.toString(url.getPort()), executor); }}

如果配置指定threadpool屬性,擴展點加載器會從URL獲取屬性值加載對應線程池策略

@SPI(&34;)public interface ThreadPool { @Adaptive({Constants.THREADPOOL_KEY}) Executor getExecutor(URL url);}

3 源碼分析

3.1 FixedThreadPool

public class FixedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { // 線程名稱 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); // 線程個數默認200 int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); // 隊列容量默認0 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); // 隊列容量等於0使用阻塞隊列SynchronousQueue // 隊列容量小於0使用無界阻塞隊列LinkedBlockingQueue // 隊列容量大於0使用有界阻塞隊列LinkedBlockingQueue return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); }}

3.2 CachedThreadPool

public class CachedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { // 獲取線程名稱 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); // 核心線程數默認0 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); // 最大線程數默認Int最大值 int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE); // 隊列容量默認0 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); // 線程空閒多少時間被回收默認1分鐘 int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE); // 隊列容量等於0使用阻塞隊列SynchronousQueue // 隊列容量小於0使用無界阻塞隊列LinkedBlockingQueue // 隊列容量大於0使用有界阻塞隊列LinkedBlockingQueue return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); }}

3.3 LimitedThreadPool

public class LimitedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { // 獲取線程名稱 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); // 核心線程數默認0 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); // 最大線程數默認200 int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); // 隊列容量默認0 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); // 隊列容量等於0使用阻塞隊列SynchronousQueue // 隊列容量小於0使用無界阻塞隊列LinkedBlockingQueue // 隊列容量大於0使用有界阻塞隊列LinkedBlockingQueue // keepalive時間設置Long.MAX_VALUE表示不回收空閒線程 return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); }}

3.4 EagerThreadPool

通過自定義線程執行器可以自定義線程處理策略,我們進行一個對比。

ThreadPoolExecutor普通線程執行器。當線程池核心線程達到閾值時新任務放入隊列。當隊列已滿開啟新線程處理。當前線程數達到最大線程數時執行拒絕策略。

EagerThreadPoolExecutor自定義線程執行器。當線程池核心線程達到閾值時,新任務不會放入隊列而是開啟新線程進行處理(要求當前線程數沒有超過最大線程數)。當前線程數達到最大線程數時任務放入隊列。隊列已滿執行拒絕策略。

public class EagerThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { // 線程名 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); // 核心線程數默認0 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); // 最大線程數默認Int最大值 int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE); // 隊列容量默認0 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); // 線程空閒多少時間被回收默認1分鐘 int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE); // 初始化自定義線程池和隊列 TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues); EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, taskQueue, new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); taskQueue.setExecutor(executor); return executor; }}public class EagerThreadPoolExecutor extends ThreadPoolExecutor { // 提交任務個數 private final AtomicInteger submittedTaskCount = new AtomicInteger(0); @Override public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } // 任務數自增 submittedTaskCount.incrementAndGet(); try { // 調用父類方法執行線程任務 super.execute(command); } // 拋出拒絕異常 catch (RejectedExecutionException rx) { final TaskQueue queue = (TaskQueue) super.getQueue(); try { // 任務重新放入隊列 if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) { // 任務重新放入隊列失敗拋出異常 submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException(&34;, rx); } } catch (InterruptedException x) { submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException(x); } } catch (Throwable t) { submittedTaskCount.decrementAndGet(); throw t; } }}public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> { @Override public boolean offer(Runnable runnable) { if (executor == null) { throw new RejectedExecutionException(&34;); } // 當前線程數 int currentPoolThreadSize = executor.getPoolSize(); // 任務數 < 當前線程數表示存在空閒worker線程則任務放入隊列等待worker線程處理 if (executor.getSubmittedTaskCount() < currentPoolThreadSize) { return super.offer(runnable); } // 當前線程數 < 最大線程數返回false表示創建worker線程 if (currentPoolThreadSize < executor.getMaximumPoolSize()) { return false; } // 當前線程數 > 最大線程數任務放入隊列 return super.offer(runnable); } public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { if (executor.isShutdown()) { throw new RejectedExecutionException(&34;); } // 任務重試放入隊列 return super.offer(o, timeout, unit); }}

4 文章總結

本文分析了DUBBO線程池策略源碼,可以根據不同業務場景選擇不同線程池策略。後續文章我們深入分析「DUBBO線程池打滿問題」請繼續關注。

請點擊【關注】獲取更多網際網路和技術乾貨,頭條號IT徐胖子原創本文請勿轉載,感謝支持

相關焦點

  • 「DUBBO系列」線程模型實現原理與源碼分析
    ,心跳等消息直接在IO線程執行connection在IO線程上將連接斷開事件放入隊列,有序逐個執行,其它消息派發到業務線程池選擇不同線程模型可以使用如下配置方法<dubbo:protocol name=&34; dispatcher=&34; /><dubbo:protocol name=&34; dispatcher=
  • 「DUBBO系列」並發控制實現原理與源碼分析
    本文我們介紹生產者和消費者並發控制怎樣配置並且在源碼層面分析並發控制實現原理。:protocol name=&34; port=&34; /> <dubbo:service executes=&34; interface=&34; ref=&34; /></beans>2.2 源碼分析ExecuteLimitFilter過濾器是生產者並發控制核心
  • 「DUBBO系列」鏈路跟蹤實現原理與源碼分析
    1.1 消費者配置<beans> <dubbo:registry address=&34; /> <dubbo:reference id=&34; interface=&34; /><beans>
  • 「DUBBO系列」線程池打滿問題分析方法與實例
    通過分析消費者日誌我們知道生產者線程池被打滿,而且可以定位到哪一個方法報錯。消費者需要做好降級策略,例如使用mock機制或者熔斷保護系統。我們還可以查找生產者地址在控制臺查詢這臺機器服務運行情況,如果不是本團隊維護還要聯繫相關技術團隊迅速處理。2.3 生產者分析通過分析生產者日誌我們知道生產者線程池被打滿,但是不知道哪一個方法報錯,這就需要結合線程快照進行分析。
  • 「DUBBO系列」集群容錯策略Failsafe源碼分析
    但是在降級之前,消費者有可能重試消費服務A,或者直接返回空結果,或者延時重試,這就是所謂集群容錯策略。集群容錯策略有很多,本文我們分析安全失敗策略。Failsafe策略被稱為安全失敗策略,只消費一次服務,如果消費失敗包裝一個空結果返回不拋出異常。我們來看一個消費者實例。
  • 「DUBBO系列」服務降級源碼分析
    我們通過分析源碼講解服務降級策略,首先看一個消費者代碼實例。2.1 不配置降級策略<dubbo:reference id=&34; interface=&34; />2.2 強制降級策略<dubbo:reference id
  • 「DUBBO系列」集群容錯策略Failfast源碼分析
    但是在降級之前消費者有可能重試消費服務A,或者直接返回錯誤結果,或者延時重試,這就是所謂集群容錯策略。集群容錯策略有很多,本文我們分析快速失敗策略。Failfast策略被稱為快速失敗策略,只消費一次服務,一旦消費失敗則拋出異常,我們看一個服務消費者代碼示例。
  • 「DUBBO系列」集群容錯策略Failover源碼分析
    但是在降級之前,消費者有可能重試消費服務A,或者直接返回空結果,或者延時重試,這就是所謂集群容錯策略。集群容錯策略有很多,本文我們分析故障轉移策略。Failover策略被稱為故障轉移策略,這是集群容錯默認策略。當第一次消費服務失敗後,消費者會根據Failover策略選擇其它生產者再次消費,默認重試次數是2次。
  • 「DUBBO系列」集群容錯策略Failback源碼分析
    但是在降級之前,消費者有可能重試消費服務A,或者直接返回空結果,或者延時重試,這就是所謂集群容錯策略。集群容錯策略有很多,本文分析失敗自動恢復策略。Failback策略被稱為失敗自動恢復策略,如果消費者服務消費失敗將返回一個空結果並記錄這次失敗請求,失敗請求會在另一個線程中進行異步重試,默認最大重試次數5次,如果重試超過5次還不成功則放棄重試並不拋出異常。
  • 從源碼上分析JUC線程池ThreadPoolExecutor
    ThreadPoolExecutor的源碼實現,由於近段時間比較忙,一直沒有時間整理出源碼分析的文章。 command);}而ExecutorService提供了很多擴展方法底層基本上是基於Executorexecute()的實現,筆者會從實現原理、源碼實現等角度結合簡化例子進行詳細的分析。
  • Dubbo源碼學習——從源碼看看dubbo對netty的使用
    前言前段時間,從頭開始將netty源碼了解了個大概,但都是原理上理解。剛好博主對dubbo框架了解過一些,這次就以dubbo框架為例,詳細看看dubbo這種出色的開源框架是如何使用netty的,又是如何與框架本身邏輯進行融合的。
  • 「DUBBO系列」服務超時機制源碼分析
    1 文章概述DUBBO有很多地方可以配置超時時間,可以配置在消費者,可以配置在生產者,可以配置為方法級別,可以配置為接口級別,還可以配置為全局級別,DUBBO官方文檔介紹這些配置優先級如下:第一優先級:方法級 > 接口級 > 全局級第二優先級:消費者 > 生產者本文從源碼層面對超時機制進行分析
  • 「Java並發編程」面試必備之線程池
    「降低資源消耗」: 通過重複利用已創建的線程來降低線程創建和銷毀所造成的消耗。「提高響應速度:」 任務到達時,可以立即執行,不需要等到線程創建再來執行任務。「提高線程的可管理性:」 線程是稀缺資源,如果無限制創建,不僅會消耗系統資源,還會因為線程的不合理分布導致資源調度失衡,降低系統的穩定性。使用線程池可以進行統一的分配、調優和監控。
  • 從源碼上分析ThreadPoolExecutor的實現原理
    原文名稱:4W字從源碼上分析JUC線程池ThreadPoolExecutor的實現原理前提很早之前就打算看一次JUC線程池ThreadPoolExecutor的源碼實現,由於近段時間比較忙,一直沒有時間整理出源碼分析的文章。
  • ThreadPoolExecutor線程池實現原理+源碼解析
    線程池繼承Thread和實現Runnable的諸多缺點,所以生產環境必須使用線程池來實現多線程。線程池(thread pool):一種線程使用模式。——維基百科簡單來說,「池」在計算機領域是指集合,線程池就是指線程集合。線程池可以對一系列線程的生命周期進行統一的調度和管理,包括線程的創建、消亡、生存時間、數量控制等。
  • Java線程池的工作原理
    Java中的線程池是我們平時經常用的技術,但是線程池到底如何工作的,它的工作原理是什麼樣的呢?只有深刻的了解它的原理才能更好的在工作中使用它。線程池工作原理如上圖所示,是一個線程池的工作原理示意圖,基於整個圖我們分析一下線程池的工作原理。
  • Java線程池,拒絕策略解析
    為什麼會有拒絕策略?線程池工作中,當任務量很大,超過系統實際承載能力時,如果不去搭理它,系統很可能崩潰,所以jdk內置提供了四種線程池的拒絕策略,可以合理解決這種問題。當線程池中線程已用完不能再創建,等待隊列也排滿,如果此時再有新任務,就會觸發執行拒絕策略之一。
  • ThreadPoolExecutor線程池原理源碼解析
    ThreadPoolExecutor的大概原理是先規定一個線程池的容量,然後給提交過來的任務創建執行線程,任務執行完畢後放在池子中等待新的任務提交過來,當然ThreadPoolExecutor的內部細節比這要複雜得多。下面就通過源碼來理解它的原理。
  • Dubbo SPI中AOP實現原理
    官網:http://dubbo.apache.org/zh-cn/docs/2.7/source_code_guide/dubbo-spi/從官網說明中我們可以看到,dubbo spi是java spi的一種增強實現,也就是這種spi機制確保了dubbo高擴展的能力。
  • 我畫了25張圖展示線程池工作原理和實現原理,建議先收藏再閱讀
    如何構造一個線程池對象本文內容我們只聊線程池ThreadPoolExecutor,查看它的源碼會發現它繼承了AbstractExecutorService抽象類,而AbstractExecutorService實現了ExecutorService接口,ExecutorService繼承了Executor接口,所以ThreadPoolExecutor間接實現了