ps:好久没发文章,悄悄翻一个
新博客:https://blog.bigdataboy.cn/article/454.html
说明
最近在使用apscheduler
过程中,为了控制每个 Job 的状态
,发现有点困难,所以就尝试改造一下
目标
为 Job 类添加一个 dict
类型的属性,用来保存用户 自定义的数据
,例如 当 job 在执行过程发生错误,就可以把状态改成 error
并停止 job
改造
地方一
apscheduler/job.py
# 增加 state 属性
__slots__ = ('_scheduler', '_jobstore_alias', 'id', 'trigger', 'executor', 'func', 'func_ref',
'args', 'kwargs', 'name', 'misfire_grace_time', 'coalesce', 'max_instances',
'next_run_time', '__weakref__', 'state')
地方二
# 在返回值中 增加 state
def __getstate__(self):
...
return {
'version': 1,
'id': self.id,
'func': self.func_ref,
'trigger': self.trigger,
'executor': self.executor,
'args': args,
'kwargs': self.kwargs,
'name': self.name,
'misfire_grace_time': self.misfire_grace_time,
'coalesce': self.coalesce,
'max_instances': self.max_instances,
'next_run_time': self.next_run_time,
'state': self.state ####
}
地方三
# 在设置 job 属性时,增加 state
def __setstate__(self, state):
if state.get('version', 1) > 1:
raise ValueError('Job has version %s, but only version 1 can be handled' %
state['version'])
self.id = state['id']
self.func_ref = state['func']
self.func = ref_to_obj(self.func_ref)
self.trigger = state['trigger']
self.executor = state['executor']
self.args = state['args']
self.kwargs = state['kwargs']
self.name = state['name']
self.misfire_grace_time = state['misfire_grace_time']
self.coalesce = state['coalesce']
self.max_instances = state['max_instances']
self.next_run_time = state['next_run_time']
self.state = state['state'] ####
地方四
# 在 add_job 函数上,添加上 state 参数
def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None, state={}, ####
misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined,
next_run_time=undefined, jobstore='default', executor='default',
replace_existing=False, **trigger_args):
...
# 把 state 参数添加到 job_kwargs 字典
job_kwargs = {
'trigger': self._create_trigger(trigger, trigger_args),
'executor': executor,
'func': func,
'args': tuple(args) if args is not None else (),
'kwargs': dict(kwargs) if kwargs is not None else {},
'id': id,
'name': name,
'misfire_grace_time': misfire_grace_time,
'coalesce': coalesce,
'max_instances': max_instances,
'next_run_time': next_run_time,
'state': state ####
}
地方五
# 在参数修复函数里,添加上 state ,并进行检测
def _modify(self, **changes):
...
if 'state' in changes:
state = changes.pop('state')
if not isinstance(state, dict):
raise TypeError('state must be a dict')
approved['state'] = state
验证
在以上五个地方添加上,就可以实现为 每个 job 开辟一个用户自定义属性的存储
添加任务
可以看到 job 能够正常获取到 state
获取任务
获取任务时 state
也是存在的
当任务发生异常时
按照官网的方案,监控任务状态需要添加监听
# 调度监听
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_MISSED, \
EVENT_JOB_ADDED, EVENT_JOB_REMOVED, EVENT_JOB_EXECUTED, EVENT_JOB_MODIFIED
scheduler = BackgroundScheduler(timezone='Asia/Shanghai')
def apscheduler_listener(event):
if event.code == EVENT_JOB_ERROR:
# 任务 发送错误时
job = scheduler.get_job(event.job_id)
job.state['state'] = 'error'
job.pause()
logger.error(f'job id: {event.job_id} traceback: {event.traceback}')
scheduler.add_listener(apscheduler_listener,
EVENT_JOB_ADDED | EVENT_JOB_REMOVED | EVENT_JOB_MODIFIED | EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED)
scheduler.start()
可以看到当任务发生错误时,就可以改变任务状态,并停止该任务
版权声明:《 【PY模块】优化 apscheduler 模块,追踪 Job 状态 》为明非原创文章,转载请注明出处!
最后编辑:2023-1-8 11:01:57