分析源碼主要還是針對 APScheduler 下的幾個關鍵的模塊
這一篇主要瞅瞅 job 事件
Job 記錄自己的觸發條件 triggers, 記錄自己的所屬的任務存儲 jobstores, 記錄自己交給誰執行 executors, 記錄由誰來調度 schedulers 以及一些任務自身的一些例如任務的唯一標識 job_id,執行時允許的時間誤差 misfire_grace_time 等等, 大概就下面這些信息
'id': self.id,
'func': self.func_ref,
'trigger': self.trigger,
'executor': self.executor,
'args': args,
'kwargs': self.kwargs,
'name': self.name,
'misfire_grace_time': self.misfire_grace_time,
'coalesce': self.coalesce,
'max_instances': self.max_instances,
'next_run_time': self.next_run_time簡單點說,Job 類包含了調度程序調用時需要的所有的配置參數,以及任務當前的狀態和所屬的調度程序
初始化 __init__
因為 Job 只是信息的保存,但是例如 Job 的暫停恢復,實際上調度程序 schedulers 來控制的,所以源碼中對 Job 提供的例如 修改,暫停,恢復 等等操作實際上都是通過調用 schedulers 的接口來實現的
Job 源碼中需要了解的部分實際上只有 創建 和 修改 2個函數,以及一些 python 定義的特殊方法( __eq__, __str__ 等等) 的重載def __init__(self, scheduler, id=None, **kwargs):
super(Job, self).__init__()
self._scheduler = scheduler
self._jobstore_alias = None
self._modify(id=id or uuid4().hex, **kwargs)很清楚的看出,創建時的對 Job 成員對象的設置,也是通過修改 _modify 這個函數設置的,主要還是分析一下 _modify 代碼
修改 _modifyschedulers 中其實對輸入參數做了細緻的定義,以及為它們初始化做了對應的初始化,大致如下,具體等介紹 schedulers 時展開
add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None,
misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined,
next_run_time=undefined, jobstore='default', executor='default',
replace_existing=False, **trigger_args)而 _modify 主要就是對傳入參數進行參數的校驗,並且需要考慮創建和修改的區別
首先解釋一下創建
配合 __init__() 中的代碼,在執行 _modify 之前實際上的成員對象只有 self._scheduler 和 self._jobstore_alias, 所以創建的時候其他成員對象都是在解析輸入後傳入 approved = {} 中,並通過下面這部分的代碼給對象的屬性賦值,若屬性不存在,先創建再賦值。
for key, value in six.iteritems(approved):
setattr(self, key, value)而修改時,部分參數是不能修改的例如 id
if 'id' in changes:
value = changes.pop('id')
# id 必須是 string
if not isinstance(value, six.string_types):
raise TypeError("id must be a nonempty string")
# 創建時不存在 id 屬性,修改的時候不允許修改
if hasattr(self, 'id'):
raise ValueError('The job ID may not be changed')
approved['id'] = value此外還有一部分需要注意, func 的解析部分需要注意
if 'func' in changes or 'args' in changes or 'kwargs' in changes:
func = changes.pop('func') if 'func' in changes else self.func
args = changes.pop('args') if 'args' in changes else self.args
kwargs = changes.pop('kwargs') if 'kwargs' in changes else self.kwargs
if isinstance(func, six.string_types):
func_ref = func
func = ref_to_obj(func)
elif callable(func):
try:
func_ref = obj_to_ref(func)
except ValueError:
# If this happens, this Job won't be serializable
func_ref = None
else:
raise TypeError('func must be a callable or a textual reference to one')
if not hasattr(self, 'name') and changes.get('name', None) is None:
changes['name'] = get_callable_name(func)
if isinstance(args, six.string_types) or not isinstance(args, Iterable):
raise TypeError('args must be a non-string iterable')
if isinstance(kwargs, six.string_types) or not isinstance(kwargs, Mapping):
raise TypeError('kwargs must be a dict-like object')
check_callable_args(func, args, kwargs)
approved['func'] = func
approved['func_ref'] = func_ref
approved['args'] = args
approved['kwargs'] = kwargs可執行函數如果以字符串的形式傳入,所以需要通過 ref_to_obj 反序列化成可執行對象,當然直接傳入一個可執行的函數也可以,通過 callable() (對於函數、方法、lambda 函式、 類以及實現了 __call__ 方法的類實例, 它都返回 True) 來測試是否可以調用,之後還需要檢測形參下是否支持 check_callable_args(func, args, kwargs)
反序列化 ref_to_objdef ref_to_obj(ref):
"""
Returns the object pointed to by ``ref``.
:type ref: str
"""
if not isinstance(ref, six.string_types):
raise TypeError('References must be strings')
if ':' not in ref:
raise ValueError('Invalid reference')
modulename, rest = ref.split(':', 1)
try:
obj = __import__(modulename, fromlist=[rest])
except ImportError:
raise LookupError('Error resolving reference %s: could not import module' % ref)
try:
for name in rest.split('.'):
obj = getattr(obj, name)
return obj
except Exception:
raise LookupError('Error resolving reference %s: error looking up object' % ref)當輸入為字符串時,通過 : 切割成2部分,第一部分是模塊名,通過 __import__ 導入第二部分的指定接口,最後通過 getattr 一層層獲取到最後的目標接口,例如:
# testfunc 模塊下 test 類裡 task 函數
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_job(func="testfunc:test.task", args=('定時任務',), trigger='cron', second='*/5', id="定時任務")
scheduler.start()
序列化 obj_to_refdef obj_to_ref(obj):
"""
Returns the path to the given callable.
:rtype: str
:raises TypeError: if the given object is not callable
:raises ValueError: if the given object is a :class:`~functools.partial`, lambda or a nested
function
"""
if isinstance(obj, partial):
raise ValueError('Cannot create a reference to a partial()')
name = get_callable_name(obj)
if '<lambda>' in name:
raise ValueError('Cannot create a reference to a lambda')
if '<locals>' in name:
raise ValueError('Cannot create a reference to a nested function')
if ismethod(obj):
if hasattr(obj, 'im_self') and obj.im_self:
# bound method
module = obj.im_self.__module__
elif hasattr(obj, 'im_class') and obj.im_class:
# unbound method
module = obj.im_class.__module__
else:
module = obj.__module__
else:
module = obj.__module__
return '%s:%s' % (module, name)partial 用於部分函數應用程式,該應用程式 「凍結」 函數參數和/或關鍵字的一部分,從而生成一個帶有簡化籤名的新對象,看一下下面的代碼,partial 是固定了 func 的部分參數,並生成一個新的籤名,它是不支持序列化的
from functools import partial
def func(x, y):
return x + y
f1 = partial(func, y=4) # 固定 y=4
print(f1(1)) # 5通過 get_callable_name 就是獲取函數最佳顯示名稱,對於 Python3.3+ 版本,可以直接使用 func.__qualname__ 來獲取,低於這個版本的獲取有點麻煩,感興趣的可以自己看一下源碼
<lambda> 和 <locals>(嵌套函數) 也是沒法序列化成 模塊名 : 函數 ,這意味著使用非內存模式的 job_store ,因為無法序列化,所以這3種類型需要額外注意ismethod(obj) 用來判斷對象是函數還是方法,在 python 中這2者是有一點出入的, 定義在類外面的是函數,定義在類裡面的,跟類綁定的是方法,而後面的 im_self 和 im_class 主要是針對 Python 2.x版本 的內容,實際上 Python 3.x 已經沒有這些 bound method 和 unbound method 的概念了
而關於這方面的介紹,可以看一下這位大佬的介紹:https://www.jianshu.com/p/a497f742ddd4
modify 中解析 func 部分就解釋完了 , check_callable_args(func, args, kwargs) 主要就是確保可以使用給定的參數調用給定的 func 。
剩餘欄位的解析都很簡單,從 changes 中取出,根據實際情況數值校驗,然後扔到 approved 中,最後在通過對 approved 的迭代 setattr, 例如 misfire_grace_time:
if 'misfire_grace_time' in changes:
value = changes.pop('misfire_grace_time')
if value is not None and (not isinstance(value, six.integer_types) or value <= 0):
raise TypeError('misfire_grace_time must be either None or a positive integer')
approved['misfire_grace_time'] = value
for key, value in six.iteritems(approved):
setattr(self, key, value)
__getstate__ 和 __setstate____getstate__ 與 __setstate__ 兩個方法分別用於對象的序列化與反序列化
在序列化時, __getstate__ 可以指定將那些信息記錄下來, 而 __setstate__ 指明如何利用已記錄的信息
這一部分的代碼很簡單,大致就是利用 _modify 中解析得到的屬性,生成一個字典,或者用字典對 Job 的屬性進行設置
def __getstate__(self):
if not self.func_ref:
raise ValueError(
'This Job cannot be serialized since the reference to its callable (%r) could not '
'be determined. Consider giving a textual reference (module:function name) '
'instead.' % (self.func,))
if ismethod(self.func) and not isclass(self.func.__self__):
args = (self.func.__self__,) + tuple(self.args)
else:
args = self.args
return {
'version': 1,
'id': self.id,
'func': self.func_ref,
'trigger': self.trigger,
'executor': self.executor,
'args': args,
'kwargs': self.kwargs,
'name': self.name,
'misfire_grace_time': self.misfire_grace_time,
'coalesce': self.coalesce,
'max_instances': self.max_instances,
'next_run_time': self.next_run_time
}
def __setstate__(self, state):
if state.get('version', 1) > 1:
raise ValueError('Job has version %s, but only version 1 can be handled' %
state['version'])
self.id = state['id']
self.func_ref = state['func']
self.func = ref_to_obj(self.func_ref)
self.trigger = state['trigger']
self.executor = state['executor']
self.args = state['args']
self.kwargs = state['kwargs']
self.name = state['name']
self.misfire_grace_time = state['misfire_grace_time']
self.coalesce = state['coalesce']
self.max_instances = state['max_instances']
self.next_run_time = state['next_run_time']
總結對 Job 的理解主要就是創建和修改,以及序列化和反序列化這部分代碼,剩下的沒有介紹的方法難度都不大,感興趣的可以自己閱讀一下~