【PY模块】优化 apscheduler 模块二,动态修改 state

新博客:https://blog.bigdataboy.cn/article/455.html

说明

上篇:https://blog.bigdataboy.cn/article/454.html

接上篇:只能在添加 job 时一次性修改,而无法实现任务中途动态修改 state 的值,所以继续改造

思路

其实改造也很简单,首先通过 job_id 获取到 Job,更新 Job 的 state 属性,然后执行相应的 停止(pause)恢复(resume)等函数,所以我们就在 停止 恢复这些函数上动手脚

分析源码

job = scheduler.get_job(job_id=job_id)
job.pause()

继续看 pause() 内部逻辑,本质调用的是 BaseScheduler 类下的 pause_job() 方法

    def pause(self):
        """
        Temporarily suspend the execution of this job.

        .. seealso:: :meth:`~apscheduler.schedulers.base.BaseScheduler.pause_job`

        :return Job: this job instance

        """
        self._scheduler.pause_job(self.id, self._jobstore_alias)
        return self

继续看 pause_job() 方法内部逻辑,发现本质调用的是 BaseScheduler 类下的 modify_job() 方法

def pause_job(self, job_id, state, jobstore=None):
    """
    Causes the given job not to be executed until it is explicitly resumed.

    :param str|unicode job_id: the identifier of the job
    :param str|unicode jobstore: alias of the job store that contains the job
    :return Job: the relevant job instance

    """
    return self.modify_job(job_id, jobstore, next_run_time=None)

继续看 modify_job() 方法内部逻辑,发现就更新了 job 的状态,其实就是修改

def modify_job(self, job_id, jobstore=None, **changes):
    """
    Modifies the properties of a single job.

    Modifications are passed to this method as extra keyword arguments.

    :param str|unicode job_id: the identifier of the job
    :param str|unicode jobstore: alias of the job store that contains the job
    :return Job: the relevant job instance

    """
    with self._jobstores_lock:
        job, jobstore = self._lookup_job(job_id, jobstore)
        job._modify(**changes)
        if jobstore:
            self._lookup_jobstore(jobstore).update_job(job)

    self._dispatch_event(JobEvent(EVENT_JOB_MODIFIED, job_id, jobstore))

    # Wake up the scheduler since the job's next run time may have been changed
    if self.state == STATE_RUNNING:
        self.wakeup()

    return job

总结

其实需要改动的就在传参上,到最后给 modify_job()函数,一并修改就可以了

改造

apscheduler/job.py

def pause(self):
    """
    Temporarily suspend the execution of this job.

    .. seealso:: :meth:`~apscheduler.schedulers.base.BaseScheduler.pause_job`

    :return Job: this job instance

    """
    # self._scheduler.pause_job(self.id, self._jobstore_alias) # 原
     self._scheduler.pause_job(self.id, self.state, self._jobstore_alias)
    return self

apscheduler/schedulers/base.py

def pause_job(self, job_id, state, jobstore=None): # 添加 state 参数
    """
    Causes the given job not to be executed until it is explicitly resumed.

    :param str|unicode job_id: the identifier of the job
    :param str|unicode jobstore: alias of the job store that contains the job
    :return Job: the relevant job instance

    """
    return self.modify_job(job_id, jobstore, state=state, next_run_time=None) # 传入 state 

其他就不用修改了,modify_job() 函数内部是不定长获取的,所以 恢复(resume)也是一样

验证

停止任务,然后再使用 job_id 获取

mark

发表评论 / Comment

用心评论~