APScheduler 源碼閱讀(二) job

2021-03-02 會偷懶的程序猿

分析源碼主要還是針對 APScheduler 下的幾個關鍵的模塊

這一篇主要瞅瞅 job 事件

Job 記錄自己的觸發條件 triggers, 記錄自己的所屬的任務存儲 jobstores, 記錄自己交給誰執行 executors, 記錄由誰來調度 schedulers 以及一些任務自身的一些例如任務的唯一標識 job_id,執行時允許的時間誤差 misfire_grace_time 等等, 大概就下面這些信息

'id': self.id,
'func': self.func_ref,
'trigger': self.trigger,
'executor': self.executor,
'args': args,
'kwargs': self.kwargs,
'name': self.name,
'misfire_grace_time': self.misfire_grace_time,
'coalesce': self.coalesce,
'max_instances': self.max_instances,
'next_run_time': self.next_run_time

簡單點說,Job 類包含了調度程序調用時需要的所有的配置參數,以及任務當前的狀態和所屬的調度程序
因為 Job 只是信息的保存,但是例如 Job 的暫停恢復,實際上調度程序 schedulers 來控制的,所以源碼中對 Job 提供的例如 修改暫停恢復 等等操作實際上都是通過調用 schedulers 的接口來實現的
Job 源碼中需要了解的部分實際上只有 創建修改 2個函數,以及一些 python 定義的特殊方法( __eq__, __str__ 等等) 的重載

初始化 __init__
def __init__(self, scheduler, id=None, **kwargs):
    super(Job, self).__init__()
    self._scheduler = scheduler
    self._jobstore_alias = None
    self._modify(id=id or uuid4().hex, **kwargs)

很清楚的看出,創建時的對 Job 成員對象的設置,也是通過修改 _modify 這個函數設置的,主要還是分析一下 _modify 代碼

修改 _modify

schedulers 中其實對輸入參數做了細緻的定義,以及為它們初始化做了對應的初始化,大致如下,具體等介紹 schedulers 時展開

add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None,
                misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined,
                next_run_time=undefined, jobstore='default', executor='default',
                replace_existing=False, **trigger_args)

而 _modify 主要就是對傳入參數進行參數的校驗,並且需要考慮創建和修改的區別

首先解釋一下創建

配合 __init__() 中的代碼,在執行 _modify 之前實際上的成員對象只有 self._scheduler 和 self._jobstore_alias, 所以創建的時候其他成員對象都是在解析輸入後傳入 approved = {} 中,並通過下面這部分的代碼給對象的屬性賦值,若屬性不存在,先創建再賦值。

for key, value in six.iteritems(approved):
    setattr(self, key, value)

而修改時,部分參數是不能修改的例如 id

if 'id' in changes:
    value = changes.pop('id')
    # id 必須是 string
    if not isinstance(value, six.string_types):
        raise TypeError("id must be a nonempty string")
    # 創建時不存在 id 屬性,修改的時候不允許修改
    if hasattr(self, 'id'):
        raise ValueError('The job ID may not be changed')
    approved['id'] = value

此外還有一部分需要注意, func 的解析部分需要注意

if 'func' in changes or 'args' in changes or 'kwargs' in changes:
    func = changes.pop('func') if 'func' in changes else self.func
    args = changes.pop('args') if 'args' in changes else self.args
    kwargs = changes.pop('kwargs') if 'kwargs' in changes else self.kwargs

    if isinstance(func, six.string_types):
        func_ref = func
        func = ref_to_obj(func)
    elif callable(func):
        try:
            func_ref = obj_to_ref(func)
        except ValueError:
            # If this happens, this Job won't be serializable
            func_ref = None
    else:
        raise TypeError('func must be a callable or a textual reference to one')

    if not hasattr(self, 'name') and changes.get('name', None) is None:
        changes['name'] = get_callable_name(func)

    if isinstance(args, six.string_types) or not isinstance(args, Iterable):
        raise TypeError('args must be a non-string iterable')
    if isinstance(kwargs, six.string_types) or not isinstance(kwargs, Mapping):
        raise TypeError('kwargs must be a dict-like object')

    check_callable_args(func, args, kwargs)

    approved['func'] = func
    approved['func_ref'] = func_ref
    approved['args'] = args
    approved['kwargs'] = kwargs

可執行函數如果以字符串的形式傳入,所以需要通過 ref_to_obj 反序列化成可執行對象,當然直接傳入一個可執行的函數也可以,通過 callable() (對於函數、方法、lambda 函式、 類以及實現了 __call__ 方法的類實例, 它都返回 True) 來測試是否可以調用,之後還需要檢測形參下是否支持 check_callable_args(func, args, kwargs)

反序列化 ref_to_obj
def ref_to_obj(ref):
    """
    Returns the object pointed to by ``ref``.

    :type ref: str

    """
    if not isinstance(ref, six.string_types):
        raise TypeError('References must be strings')
    if ':' not in ref:
        raise ValueError('Invalid reference')

    modulename, rest = ref.split(':', 1)
    try:
        obj = __import__(modulename, fromlist=[rest])
    except ImportError:
        raise LookupError('Error resolving reference %s: could not import module' % ref)

    try:
        for name in rest.split('.'):
            obj = getattr(obj, name)
        return obj
    except Exception:
        raise LookupError('Error resolving reference %s: error looking up object' % ref)

當輸入為字符串時,通過 : 切割成2部分,第一部分是模塊名,通過 __import__ 導入第二部分的指定接口,最後通過 getattr 一層層獲取到最後的目標接口,例如:

# testfunc 模塊下 test 類裡 task 函數
if __name__ == '__main__':
    scheduler = BlockingScheduler()
    scheduler.add_job(func="testfunc:test.task", args=('定時任務',), trigger='cron', second='*/5', id="定時任務")
    scheduler.start()

序列化 obj_to_ref
def obj_to_ref(obj):
    """
    Returns the path to the given callable.

    :rtype: str
    :raises TypeError: if the given object is not callable
    :raises ValueError: if the given object is a :class:`~functools.partial`, lambda or a nested
        function

    """
    if isinstance(obj, partial):
        raise ValueError('Cannot create a reference to a partial()')

    name = get_callable_name(obj)
    if '<lambda>' in name:
        raise ValueError('Cannot create a reference to a lambda')
    if '<locals>' in name:
        raise ValueError('Cannot create a reference to a nested function')

    if ismethod(obj):
        if hasattr(obj, 'im_self') and obj.im_self:
            # bound method
            module = obj.im_self.__module__
        elif hasattr(obj, 'im_class') and obj.im_class:
            # unbound method
            module = obj.im_class.__module__
        else:
            module = obj.__module__
    else:
        module = obj.__module__
    return '%s:%s' % (module, name)

partial 用於部分函數應用程式,該應用程式 「凍結」 函數參數和/或關鍵字的一部分,從而生成一個帶有簡化籤名的新對象,看一下下面的代碼,partial 是固定了 func 的部分參數,並生成一個新的籤名,它是不支持序列化的

from functools import partial
 
def func(x, y):
    return x + y
 
f1 = partial(func, y=4)  # 固定 y=4
print(f1(1))  # 5

通過 get_callable_name 就是獲取函數最佳顯示名稱,對於 Python3.3+ 版本,可以直接使用 func.__qualname__ 來獲取,低於這個版本的獲取有點麻煩,感興趣的可以自己看一下源碼
<lambda> 和 <locals>(嵌套函數) 也是沒法序列化成 模塊名 : 函數 ,這意味著使用非內存模式的 job_store ,因為無法序列化,所以這3種類型需要額外注意

ismethod(obj) 用來判斷對象是函數還是方法,在 python 中這2者是有一點出入的, 定義在類外面的是函數,定義在類裡面的,跟類綁定的是方法,而後面的 im_self 和 im_class 主要是針對 Python 2.x版本 的內容,實際上 Python 3.x 已經沒有這些 bound method 和 unbound method 的概念了

而關於這方面的介紹,可以看一下這位大佬的介紹:https://www.jianshu.com/p/a497f742ddd4

modify 中解析 func 部分就解釋完了 , check_callable_args(func, args, kwargs) 主要就是確保可以使用給定的參數調用給定的 func 。

剩餘欄位的解析都很簡單,從 changes 中取出,根據實際情況數值校驗,然後扔到 approved 中,最後在通過對 approved 的迭代 setattr, 例如 misfire_grace_time:

if 'misfire_grace_time' in changes:
    value = changes.pop('misfire_grace_time')
    if value is not None and (not isinstance(value, six.integer_types) or value <= 0):
        raise TypeError('misfire_grace_time must be either None or a positive integer')
    approved['misfire_grace_time'] = value

for key, value in six.iteritems(approved):
    setattr(self, key, value)

__getstate__ 和 __setstate__

__getstate__ 與 __setstate__ 兩個方法分別用於對象的序列化與反序列化

在序列化時, __getstate__ 可以指定將那些信息記錄下來, 而 __setstate__ 指明如何利用已記錄的信息

這一部分的代碼很簡單,大致就是利用 _modify 中解析得到的屬性,生成一個字典,或者用字典對 Job 的屬性進行設置

def __getstate__(self):
    if not self.func_ref:
        raise ValueError(
            'This Job cannot be serialized since the reference to its callable (%r) could not '
            'be determined. Consider giving a textual reference (module:function name) '
            'instead.' % (self.func,))


    if ismethod(self.func) and not isclass(self.func.__self__):
        args = (self.func.__self__,) + tuple(self.args)
    else:
        args = self.args

    return {
        'version': 1,
        'id': self.id,
        'func': self.func_ref,
        'trigger': self.trigger,
        'executor': self.executor,
        'args': args,
        'kwargs': self.kwargs,
        'name': self.name,
        'misfire_grace_time': self.misfire_grace_time,
        'coalesce': self.coalesce,
        'max_instances': self.max_instances,
        'next_run_time': self.next_run_time
    }

def __setstate__(self, state):
    if state.get('version', 1) > 1:
        raise ValueError('Job has version %s, but only version 1 can be handled' %
                         state['version'])

    self.id = state['id']
    self.func_ref = state['func']
    self.func = ref_to_obj(self.func_ref)
    self.trigger = state['trigger']
    self.executor = state['executor']
    self.args = state['args']
    self.kwargs = state['kwargs']
    self.name = state['name']
    self.misfire_grace_time = state['misfire_grace_time']
    self.coalesce = state['coalesce']
    self.max_instances = state['max_instances']
    self.next_run_time = state['next_run_time']

總結

對 Job 的理解主要就是創建和修改,以及序列化和反序列化這部分代碼,剩下的沒有介紹的方法難度都不大,感興趣的可以自己閱讀一下~

相關焦點

  • 乾貨|新手也能看懂的源碼閱讀技巧
    搭上 SpringBoot 攔截器源碼分析專車【原創】008 | SpringBoot 源碼專車總結(共8篇)但是俗話說,賣你魚不如教你打魚,有小白粉說,那新手有什麼閱讀源碼的技巧呢,這不,來了。那時候開始意識到,源碼這東西在之前的工作的中感受不到,但是在面試中好像面的還挺頻繁的,從此有意識的開始了jdk部分源碼的閱讀(主要是集合)。一開始看源碼,看的特別糙,知道個大概,知道ArrayList的底層實現是數組,HashMap的底層是散列表(數組+鍊表);更深入一點的擴容、hash碰撞等等就不知道了。
  • Kubernetes scheduler學習筆記
    Kubernetes scheduler是一個策略豐富、拓撲感知、工作負載特定的功能,顯著影響可用性、性能和容量。為了能更好的使用它,所以從源碼的角度,對它進行一個全方位的分析與學習。scheduler的功能不多,但邏輯比較複雜,裡面有很多考慮的因素,總結下來大致有如下幾點:Leader選主,確保集群中只有一個scheduler在工作,其它只是高可用備份實例。通過endpoint:kube-scheduler作為仲裁資源。
  • Spring-Task源碼解析
    <task:schedulerid="scheduler"pool-size="3"/><beanid="task"class="task.Task"/><task:scheduled-tasksscheduler
  • 源碼視角,全方位學習Kubernetes scheduler
    只有在scheduler.CreateFromConfig(policy)才會初始化該檢查,在RegisterCustomFitPredicate中註冊,默認無該檢查。10、checkServiceAffinity> 檢查服務類同關係。
  • 一篇讀懂Kubernetes Scheduler擴展功能
    平行的custom scheduler,單獨或者和默認kube-scheduler一起運行在集群中scheduler extender: 實現一個"scheduler extender",kube-scheduler會調用它(http/https)作為默認調度算法(預選&優選&bind)的補充scheduler framework: 實現scheduler framework plugins
  • 如何閱讀Java源碼?
    閱讀Java源碼的前提條件:1、技術基礎在閱讀源碼之前,我們要有一定程度的技術基礎的支持。比如設計模式,許多Java源碼當中都會涉及到。再比如閱讀Spring源碼的時候,勢必要先對IOC,AOP,Java動態代理等知識點有所了解。2、強烈的求知慾強烈的求知慾是閱讀源碼的核心動力!
  • RxJava系列六(從微觀角度解讀RxJava源碼)
    Subscriber源碼分析接著我們看下觀察者Subscriber的源碼,為了增加可讀性,我去掉了源碼中的注釋和部分代碼。二、操作符原理分析之前我們介紹過幾十個操作符,要一一分析它們的源碼顯然不太現實。在這裡我拋磚引玉,選取一個相對簡單且常用的map操作符來分析。
  • 秋季FLAG押題班第9課 - Kafka, Kubernetes, Zookeeper和系統設計 Scheduler
    files/slides/Distributed%20Resource%20Scheduling%20Frameworks.pdfhttp://csc.csudh.edu/btang/seminar/slides/Omega-Matt_Levan.pdfhttps://stackoverflow.com/questions/26094969/design-a-generic-job-scheduler
  • 如何閱讀源碼?推薦一本書
    最終,這些思維邏輯和演化過程都會投射和堆疊到源碼上,使得源碼閱讀的過程是一個通過源碼去逆推思維邏輯和演化過程的工作,因此十分困難。3 源碼閱讀方法選好源碼項目之後,要做的就是閱讀源碼。作者介紹了源碼閱讀的方法、技巧、經驗。主要包括兩個大的步驟:項目初探源碼閱讀在項目初探環節,主要是通過斷點運行項目,然後分析項目的整體框架、跳轉流向。
  • 微服務專題|Naocs 源碼設計的精髓就在這了,給你一個手撕面試官的機會
    微服務專題|Naocs 源碼設計的精髓就在這了,給你一個手撕面試官的機會Nacos 如何扛住高並發讀寫?最近經常閱讀源碼,發現大部分框架在解決並發讀寫的時候,都會使用使用COW的思想來解決;nacos也不例外。
  • 程式設計師是如何閱讀源碼的
    最近寫了很多源碼分析相關的文章,React、Vue 都有,想把我閱讀源碼的一些心得分享給大家。一般這時候會開始在網上搜文章,如何調試 React 源碼。
  • Apache Dolphinscheduler 1.3.x 系列配置文件指南
    前言本文檔為dolphinscheduler配置文件指南,針對版本為 dolphinscheduler-1.3.x 版本.
  • 【AP課程介紹】AP Seminar 研討會(AP Capstone 頂石系列)
    參考網頁:哈佛官網https://oue.fas.harvard.edu/apexams英國的劍橋大學:入學申請需要學生在五門與申請專業相關的AP課程中獲得滿分。參考網頁:美國大學理事會官網https://apstudent.collegeboard.org/apcourse/ap-seminar
  • 淺談 Kubernetes Scheduler (一)
    概述這篇文章是基於 Kubernetes 的 master commitid: 8e8b6a01cf6bf55dea5e2e4f554597a95c82988a 寫下的源碼分析文檔。調度在 Kubernetes 裡面是 Kube-scheduler 組件實現的。Kube-scheduler 的主要邏輯在於,如何為集群中的每一個新創建的 Pod 或者沒有被調度的 Pod 找到合適的節點。帶著問題出發問題 1:調度器如何平衡準確性和效率性?
  • PEA源碼閱讀筆記
    筆者將這條信息整理在了公眾號文章 「測繪黑板報-201027」 中,截止 2021 年 1 月 31 日已經有了 5800+ 閱讀量,是黑板報系列文章平均閱讀量的兩倍左右,可以看出不少小夥伴對 GA 開源軟體的熱情。
  • RTKLIB 源碼閱讀筆記
    全文約7000字,含9副圖和2個表格,閱讀時間大約22分鐘。內容目錄1. 簡單介紹2. 代碼特點3. 疑似bugs4. 優化tips5. 簡單測試6. 20個問與答7.源碼裡附帶的一些c文件和函數和RTKLIB的較為接近。不過有geograv_e.c, satorbit.c, satorbit_e.c, satorbit_s.c, shadowfunc.c, shadowfunc_e.c 這6個文件未開源,僅提供二進位文件。
  • 黑馬程式設計師:技術筆記大數據面試題之spark相關(二)
    昨天分享了大數據面試題之spark相關一,看到有很大的反響,今天就分享接下來的二,希望能更好的幫助到大家!2)spark用戶提交的任務成為application,一個application對應一個sparkcontext,app中存在多個job,每觸發一次action操作就會產生一個job。
  • 分布式調度系統XXL-JOB安裝和簡單使用
    腳本創建好資料庫和表,然後直接將github 上的源碼 導入到 idea 按照Spring Boot 應用來啟動即可。xxl-job 是只提供了的源碼文件,如果要運行,需要自己先將 java 源碼 通過 maven 打包構建成運行包,然後才能運行。所以接下來我來詳細講述一下如何在linux 下安裝啟動 xxl-job,這裡使用的 linux 版本為:Ubuntu 16.04。 其他版本也類似。
  • Apex Batch和Apex Scheduler-定時批處理大量數據
    AsyncApexJob job = [SELECT Id, Status, JobItemsProcessed, TotalJobItems, NumberOfErrors FROM AsyncApexJob WHERE ID = :batchId ]; 下面看一個具體的例子,當環境中有許多操作在一個對象上時,例如PB,Flow,Trigger
  • 你的第一份Python庫源碼閱讀:records
    代碼不超過1000行,如果是第一次嘗試閱讀python開源項目,這是一個很好的選擇。records庫的使用非常簡單且人性化,定義資料庫連接串和sql語句,然後將返回值作為rows列印出來,或者輸出為文件,沒有複雜的orm邏輯,實現邏輯很清晰依賴庫records有一些pip依賴,每個依賴項的作用如下:源碼分析工具