hangfire是一款開源的,可擴展的,不需要基於Windows 服務及Windows 調度任務的,易於上手的,且自帶Dashboard的分布式後臺作業調度框架,本系列文章將基於hangfire[1.7.10]源碼揭秘hangfire的實現機制。
系列目錄
[1]:hangfire工作原理概述
[2]:hangfire的狀態管理
[3]:hangfire中的設計模式
[4]:hangfire總結
可視化監控
儀錶盤
隊列
作業詳情
hangifre工作原理概述
本文將從以下四個方面講述hangfire的工作原理
目錄
一、幾個基本概念
二、hangfire簡單使用
三、主要對象介紹
四、hangfire如何工作
一、幾個基本概念
伺服器實例[Server]hangfire伺服器實例是hangfire的核心,啟動伺服器實例後臺作業將在他的統一調度下開始運作。
hangfire伺服器實例由執行不同工作的不同組件組成:[worker]偵聽隊列和處理作業,重複的調度程序[recurring scheduler]使重複的作業排隊,調度輪詢器使延遲的作業排隊[delayedJob scheduler],到期管理器刪除過時的作業並保持存儲儘可能乾淨等。
[原文]
Hangfire Server consist of different components that are doing different work: workers listen to queue and process jobs, recurring scheduler enqueues recurring jobs, schedule poller enqueues delayed jobs, expire manager removes obsolete jobs and keeps the storage as clean as possible, etc.
所以啟動一個hangfire的伺服器實例只需要一句代碼後臺調度即開始運行:
varserver=newBackgroundJobServer();// Wait for graceful server shutdown.server.Dispose();
隊列[Queue]隊列是作業的載體,作業經過後臺調度後進入到隊列中,排入到隊列的作業將被[worker]執行並消費。我們可以通過配置設置不同的伺服器實例處理不同的隊列如varoptions=newBackgroundJobServerOptions{Queues=new[]{"alpha","beta","default"}}; (newBackgroundJobServer(options)){/* ... */}
作業[Job]作業是實際要執行的後臺任務實際表現為一個方法,它可以被標記為指定的隊列,如:[Queue("alpha")]publicvoidSomeMethod(){}BackgroundJob.Enqueue(()=>SomeMethod());
二、hangfire簡單使用
在dotnet core web項目中啟用hangfire其實就兩句代碼,第一步注入hangfire,第二部使用hangfire
注入public void ConfigureServices(IServiceCollection services) { services.AddHangfire(); }啟用public void Configure(IApplicationBuilder app, IHostingEnvironment env){app.UseHangfireServer();app.UseHangfireDashboard(); }添加作業等待調度一次性作業[Fire-and-forget jobs] var jobId = BackgroundJob.Enqueue( () => Console.WriteLine("Fire-and-forget!"));
周期性作業[Recurring jobs]基於CRON 表達式進行周期性調度
RecurringJob.AddOrUpdate( () => Console.WriteLine("Recurring!"), Cron.Daily);
延遲調度作業[Delayed jobs]只執行一次但不是立即執行,需延遲一段時間執行
var jobId = BackgroundJob.Schedule( () => Console.WriteLine("Delayed!"), TimeSpan.FromDays(7));
三、主要對象介紹
BackgroundJobServer伺服器實例,前面已經介紹過了。
BackgroundProcessingServer立即在後臺線程中啟動進程。負責發布/刪除綁定到存儲的伺服器。用無限循環包裝所有進程並自動重試。在單個上下文中執行所有進程。在處理方法中使用超時,等待所有組件,取消信號關閉包含一些必需的進程並使用存儲進程。生成唯一的ID。屬性仍然很差。[原文]
Immediately starts the processes in a background thread. Responsible for announcing/removing a server, bound to a storage. Wraps all the processes with a infinite loop and automatic retry. Executes all the processes in a single context. Uses timeout in dispose method, waits for all the components, cancel signals shutdown Contains some required processes and uses storage processes. Generates unique id. Properties are still bad.
BackgroundServerProcess服務主進程,通過該進程在啟動各種後臺進程如worker對應的後臺工作進程,DelayedJobScheduler、RecurringJobScheduler對應的後臺輪詢進程以及一些框架必須的服務監聽進程,心跳進程及作業監聽進程等。
BackgroundDispatcher後臺線程調度者也是後臺線程的實際創建者,職責是創建後臺線程把並對線程進行管理
以下是三類後臺線程[IBackgroundProcess]Worker根據配置的線程數創建1個或多個後臺線程DelayedJobScheduler創建1個後臺線程用於輪詢RecurringJobScheduler創建1個後臺線程用於輪詢
四、hangfire如何工作
ER圖
從前面的介紹我們指定啟用hangfire實際只需要兩句代碼即:
HangfireApplicationBuilderExtensions.AddHangfire該步驟實際進行各種注入包括注入全局配置GlobalConfiguration、JobStorage、JobActivator等HangfireApplicationBuilderExtensions.UseHangfireServer或者通過HostedService方式啟動:AddHangfireServer無論哪種方式其實核心就幹了一件事情,創建一個hangfire服務實例:varserver=newBackgroundJobServer();// Wait for graceful server shutdown.server.Dispose();接下來我們看構造函數中做了什麼?
public BackgroundJobServer( [NotNull] BackgroundJobServerOptions options, [NotNull] JobStorage storage, [NotNull] IEnumerable<IBackgroundProcess> additionalProcesses, [CanBeNull] IJobFilterProvider filterProvider, [CanBeNull] JobActivator activator, [CanBeNull] IBackgroundJobFactory factory, [CanBeNull] IBackgroundJobPerformer performer, [CanBeNull] IBackgroundJobStateChanger stateChanger) {
....
var processes = new List<IBackgroundProcessDispatcherBuilder>(); processes.AddRange(GetRequiredProcesses(filterProvider, activator, factory, performer, stateChanger)); [注釋1] processes.AddRange(additionalProcesses.Select(x => x.UseBackgroundPool(1)));
....
_processingServer = newBackgroundProcessingServer( storage, processes,properties, GetProcessingServerOptions()); [注釋2]
}
[注釋1]
將必須的IBackgroundProcessDispatcherBuilder加入到集合,這些必須的後臺進程調度對象就包括(Worker、DelayedJobScheduler、RecurringJobScheduler 生成的IBackgroundProcessDispatcherBuilder[調用擴展方法UseBackgroundPool])
[注釋2]
實例化服務主進程對象
接下來看BackgroundProcessingServer構造函數代碼
internal BackgroundProcessingServer(
[NotNull] BackgroundServerProcess process,
[NotNull] BackgroundProcessingServerOptions options) {
....
_dispatcher = CreateDispatcher();[注釋3]
....
}
private IBackgroundDispatcher CreateDispatcher() {
....
return new BackgroundDispatcher(
execution,
RunServer,
execution,
ThreadFactory);
}
private void RunServer(Guid executionId, object state)
{
....
_process.Execute(executionId, (BackgroundExecution)state, _stoppingCts.Token, _stoppedCts.Token, _shutdownCts.Token);
}
[注釋3]
創建調度對象,我們知道_process是BackgroundServerProces所以意思就是創建一個進程調用BackgroundServerProces的Execute方法,接著看執行方法做了什麼:
public void Execute(Guid executionId, BackgroundExecution execution, CancellationToken stoppingToken, CancellationToken stoppedToken, CancellationToken shutdownToken) {
....
StartDispatchers(context, dispatchers);
....
}
private void StartDispatchers(BackgroundServerContext context, ICollection<IBackgroundDispatcher> dispatchers){
....
foreach (var dispatcherBuilder in _dispatcherBuilders) { dispatchers.Add(dispatcherBuilder.Create(context, _options));[注釋4]
}
....
}
[注釋4]
調用BackgroundProcessDispatcherBuilder的Create方法創建調度:
public IBackgroundDispatcher Create(BackgroundServerContext context, BackgroundProcessingServerOptions options) {
....
return new BackgroundDispatcher(
execution,
ExecuteProcess,
Tuple.Create(_process, context, execution),
_threadFactory);
}
private static void ExecuteProcess(Guid executionId, object state)
{
var tuple = (Tuple<IBackgroundProcess, BackgroundServerContext, BackgroundExecution>)state;
....
while (!context.IsStopping)
{
tuple.Item1.Execute(context);[注釋5]
tuple.Item3.NotifySucceeded();
}
}
[注釋5]
可以看出每一種類型的後臺調度後在執行之個while循環即不停執行具體的IBackgroundProcess如work、DelayedJobScheduler、RecurringJobScheduler中的任務。
至此基本運行機制就清楚了。