請點擊【關注】獲取更多網際網路和技術乾貨,頭條號IT徐胖子原創本文請勿轉載,感謝支持
本系列文章已經分析了DUBBO線程模型實現原理,本文簡單進行回顧。我們知道DUBBO提供五種線程模型
all所有消息都派發到業務線程池,包括請求,響應,連接事件,斷開事件,心跳direct所有消息都不派發到業務線程池,全部在IO線程直接執行message只有請求響應消息派發到業務線程池,其它連接斷開事件,心跳等消息直接在IO線程執行execution只有請求消息派發到業務線程池,響應和其它連接斷開事件,心跳等消息直接在IO線程執行connection在IO線程上將連接斷開事件放入隊列,有序逐個執行,其它消息派發到業務線程池
不同線程模型會選擇使用IO線程還是業務線程,那麼業務線程池採用什麼線程池策略是本文需要回答的問題。DUBBO提供了多種線程池策略,選擇線程池策略需要在配置文件指定threadpool屬性
<dubbo:protocol name=&34; threadpool=&34; threads=&34; /><dubbo:protocol name=&34; threadpool=&34; threads=&34; /><dubbo:protocol name=&34; threadpool=&34; threads=&34; /><dubbo:protocol name=&34; threadpool=&34; threads=&34; />
不同線程池策略會創建不同線程池,不同線程池處理任務方式也不相同
fixed固定線程數量cached線程空閒一分鐘會被回收,當新請求到來時會創建新線程limited線程個數隨著任務增加而增加,但不會超過最大閾值。空閒線程不會被回收eager當所有核心線程數都處於忙碌狀態時,優先創建新線程執行任務
public class AllDispatcher implements Dispatcher { public static final String NAME = &34;; @Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { return new AllChannelHandler(handler, url); }}public class AllChannelHandler extends WrappedChannelHandler { public AllChannelHandler(ChannelHandler handler, URL url) { super(handler, url); }}
分析WrappedChannelHandler構造函數
public class WrappedChannelHandler implements ChannelHandlerDelegate { public WrappedChannelHandler(ChannelHandler handler, URL url) { this.handler = handler; this.url = url; // 獲取線程池自適應擴展點默認FixedThreadPool executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY; if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) { componentKey = Constants.CONSUMER_SIDE; } DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); dataStore.put(componentKey, Integer.toString(url.getPort()), executor); }}
如果配置指定threadpool屬性,擴展點加載器會從URL獲取屬性值加載對應線程池策略
@SPI(&34;)public interface ThreadPool { @Adaptive({Constants.THREADPOOL_KEY}) Executor getExecutor(URL url);}
public class FixedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { // 線程名稱 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); // 線程個數默認200 int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); // 隊列容量默認0 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); // 隊列容量等於0使用阻塞隊列SynchronousQueue // 隊列容量小於0使用無界阻塞隊列LinkedBlockingQueue // 隊列容量大於0使用有界阻塞隊列LinkedBlockingQueue return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); }}
public class CachedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { // 獲取線程名稱 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); // 核心線程數默認0 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); // 最大線程數默認Int最大值 int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE); // 隊列容量默認0 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); // 線程空閒多少時間被回收默認1分鐘 int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE); // 隊列容量等於0使用阻塞隊列SynchronousQueue // 隊列容量小於0使用無界阻塞隊列LinkedBlockingQueue // 隊列容量大於0使用有界阻塞隊列LinkedBlockingQueue return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); }}
public class LimitedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { // 獲取線程名稱 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); // 核心線程數默認0 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); // 最大線程數默認200 int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); // 隊列容量默認0 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); // 隊列容量等於0使用阻塞隊列SynchronousQueue // 隊列容量小於0使用無界阻塞隊列LinkedBlockingQueue // 隊列容量大於0使用有界阻塞隊列LinkedBlockingQueue // keepalive時間設置Long.MAX_VALUE表示不回收空閒線程 return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); }}
通過自定義線程執行器可以自定義線程處理策略,我們進行一個對比。
ThreadPoolExecutor普通線程執行器。當線程池核心線程達到閾值時新任務放入隊列。當隊列已滿開啟新線程處理。當前線程數達到最大線程數時執行拒絕策略。
EagerThreadPoolExecutor自定義線程執行器。當線程池核心線程達到閾值時,新任務不會放入隊列而是開啟新線程進行處理(要求當前線程數沒有超過最大線程數)。當前線程數達到最大線程數時任務放入隊列。隊列已滿執行拒絕策略。
public class EagerThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { // 線程名 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); // 核心線程數默認0 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); // 最大線程數默認Int最大值 int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE); // 隊列容量默認0 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); // 線程空閒多少時間被回收默認1分鐘 int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE); // 初始化自定義線程池和隊列 TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues); EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, taskQueue, new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); taskQueue.setExecutor(executor); return executor; }}public class EagerThreadPoolExecutor extends ThreadPoolExecutor { // 提交任務個數 private final AtomicInteger submittedTaskCount = new AtomicInteger(0); @Override public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } // 任務數自增 submittedTaskCount.incrementAndGet(); try { // 調用父類方法執行線程任務 super.execute(command); } // 拋出拒絕異常 catch (RejectedExecutionException rx) { final TaskQueue queue = (TaskQueue) super.getQueue(); try { // 任務重新放入隊列 if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) { // 任務重新放入隊列失敗拋出異常 submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException(&34;, rx); } } catch (InterruptedException x) { submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException(x); } } catch (Throwable t) { submittedTaskCount.decrementAndGet(); throw t; } }}public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> { @Override public boolean offer(Runnable runnable) { if (executor == null) { throw new RejectedExecutionException(&34;); } // 當前線程數 int currentPoolThreadSize = executor.getPoolSize(); // 任務數 < 當前線程數表示存在空閒worker線程則任務放入隊列等待worker線程處理 if (executor.getSubmittedTaskCount() < currentPoolThreadSize) { return super.offer(runnable); } // 當前線程數 < 最大線程數返回false表示創建worker線程 if (currentPoolThreadSize < executor.getMaximumPoolSize()) { return false; } // 當前線程數 > 最大線程數任務放入隊列 return super.offer(runnable); } public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { if (executor.isShutdown()) { throw new RejectedExecutionException(&34;); } // 任務重試放入隊列 return super.offer(o, timeout, unit); }}
本文分析了DUBBO線程池策略源碼,可以根據不同業務場景選擇不同線程池策略。後續文章我們深入分析「DUBBO線程池打滿問題」請繼續關注。
請點擊【關注】獲取更多網際網路和技術乾貨,頭條號IT徐胖子原創本文請勿轉載,感謝支持