新博客:https://blog.bigdataboy.cn/article/455.html
说明
上篇:https://blog.bigdataboy.cn/article/454.html
接上篇:只能在添加 job 时一次性修改,而无法实现任务中途动态修改 state 的值,所以继续改造
思路
其实改造也很简单,首先通过 job_id 获取到 Job,更新 Job 的 state 属性,然后执行相应的 停止(pause)
、 恢复(resume)
等函数,所以我们就在 停止 恢复这些函数上动手脚
分析源码
job = scheduler.get_job(job_id=job_id)
job.pause()
继续看
pause()
内部逻辑,本质调用的是BaseScheduler
类下的pause_job() 方法
def pause(self):
"""
Temporarily suspend the execution of this job.
.. seealso:: :meth:`~apscheduler.schedulers.base.BaseScheduler.pause_job`
:return Job: this job instance
"""
self._scheduler.pause_job(self.id, self._jobstore_alias)
return self
继续看
pause_job()
方法内部逻辑,发现本质调用的是BaseScheduler
类下的modify_job() 方法
def pause_job(self, job_id, state, jobstore=None):
"""
Causes the given job not to be executed until it is explicitly resumed.
:param str|unicode job_id: the identifier of the job
:param str|unicode jobstore: alias of the job store that contains the job
:return Job: the relevant job instance
"""
return self.modify_job(job_id, jobstore, next_run_time=None)
继续看
modify_job()
方法内部逻辑,发现就更新了 job 的状态,其实就是修改
def modify_job(self, job_id, jobstore=None, **changes):
"""
Modifies the properties of a single job.
Modifications are passed to this method as extra keyword arguments.
:param str|unicode job_id: the identifier of the job
:param str|unicode jobstore: alias of the job store that contains the job
:return Job: the relevant job instance
"""
with self._jobstores_lock:
job, jobstore = self._lookup_job(job_id, jobstore)
job._modify(**changes)
if jobstore:
self._lookup_jobstore(jobstore).update_job(job)
self._dispatch_event(JobEvent(EVENT_JOB_MODIFIED, job_id, jobstore))
# Wake up the scheduler since the job's next run time may have been changed
if self.state == STATE_RUNNING:
self.wakeup()
return job
总结
其实需要改动的就在传参上,到最后给 modify_job()
函数,一并修改就可以了
改造
apscheduler/job.py
def pause(self):
"""
Temporarily suspend the execution of this job.
.. seealso:: :meth:`~apscheduler.schedulers.base.BaseScheduler.pause_job`
:return Job: this job instance
"""
# self._scheduler.pause_job(self.id, self._jobstore_alias) # 原
self._scheduler.pause_job(self.id, self.state, self._jobstore_alias)
return self
apscheduler/schedulers/base.py
def pause_job(self, job_id, state, jobstore=None): # 添加 state 参数
"""
Causes the given job not to be executed until it is explicitly resumed.
:param str|unicode job_id: the identifier of the job
:param str|unicode jobstore: alias of the job store that contains the job
:return Job: the relevant job instance
"""
return self.modify_job(job_id, jobstore, state=state, next_run_time=None) # 传入 state
其他就不用修改了,
modify_job()
函数内部是不定长获取的,所以恢复(resume)
也是一样
验证
停止任务,然后再使用 job_id 获取
版权声明:《 【PY模块】优化 apscheduler 模块二,动态修改 state 》为明妃原创文章,转载请注明出处!
最后编辑:2023-1-9 02:01:00