January Star
  • Home
  • Categories
  • Tags
  • Archives

sched标准库源码学习

Contents

  • 简介
  • Event
  • scheduler

简介

sched 提供了一个具有事件调度机制的类 scheduler 。

不过该模块在文件的开头简介里就说明了:

该类的每个实例内部会维护一个队列。

但是该类并不是线程安全的。

如果想要线程安全,你必须自己根据 scheduler 来定制一个,

或者在多线程环境里,使用单例模式来解决线程安全问题。

官方文档里建议使用 threading.Timer 来延时执行某个方法,可以达到类似于 sched 的效果,同时也是线程安全的。

从上面的介绍就可以看出, scheduler 不支持多线程,所以它内部的阻塞以及事件回调函数的执行应该都是用同步的方式来做的,且其中一个步骤执行时抛出异常,都会导致事件调试失败,这个算是它的一个不小的缺点吧,当然对于要求不高的任务,够用啦。

Event

既然是有事件调度,那么就有一个事件对象。

在 sched 模块中是以命名元组来实现的。

该事件有四个属性:

  • time: 该事件的调度时间,当 timefunc 的返回值 >=time 时,该事件可以进行调度
  • priority: 该事件的优先级,数值越小,优先级越高(Unix)
  • action: 该事件的动作,即回调函数
  • argument: 回调函数的参数,元组形式,不支持关键字参数
1
Event = namedtuple('Event', 'time, priority, action, argument')

scheduler

该类有两个参数:

  1. timefunc

    timefunc 函数要求不需要参数,并且返回一个数字,来表示时间概念

  2. delayfunc

    delayfunc 函数要求有一个参数,参数类型与 timefunc 函数的返回值匹配。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class scheduler:
    def __init__(self, timefunc, delayfunc):
        self._queue = [] # 内部队列其实是用 list 来实现 
        self.timefunc = timefunc
        self.delayfunc = delayfunc

    def enterabs(self, time, priority, action, argument):
        """ 插入一个事件,使用绝对时间 

        四个参数正好是用来生成一个事件用的 
        本方法会返回一个 ID,可以用来删除对应的事件 
        """
        event = Event(time, priority, action, argument)
        heapq.heappush(self._queue, event)
        # 这个 ID 其实就一个事件对象,不是说一提到 ID 就是一个唯一数字 
        return event

    def enter(self, delay, priority, action, argument):
        """ 插入一个事件,使用相对时间,

        相对于 `timefunc` 返回的时间 
        """
        time = self.timefunc() + delay
        return self.enterabs(time, priority, action, argument)

    def cancel(self, event):
        """ 删除一个事件 

        这个事件可以是 `enterabs` 或者  `enter` 方法的返回值 

        如果该事件不在内部队列里面,会抛出 `ValueError` 异常 
        """
        self._queue.remove(event)
        heapq.heapify(self._queue)

    def empty(self):
        """ 检查内部队列是否为空 """
        return not self._queue

    def run(self):
        """ 开始事件调度,直接所有事件调度完成 

        如果 timefunc/deltafunc/ 事件的回调函数抛出异常了,事件调试也会中止,
        不过幸运的是,你可以再次调用 `run` 来重新调试剩余事件。
        """
        # 将二次调用的方法 / 属性都统一到本地变量 
        # 一方面是加快调用速度,另一面也是为了线程安全考虑 
        # 为本地变量的话,直接一个字节码 `LOST_FAST` 搞定了 
        # Python 执行线程切换时以字节码为单位的 
        q = self._queue
        delayfunc = self.delayfunc
        timefunc = self.timefunc
        pop = heapq.heappop
        while q:
            time, priority, action, argument = checked_event = q[0]
            now = timefunc()
            if now < time:
                delayfunc(time - now)
            else:
                event = pop(q)
                # 从 q[0] 取出事件和,pop 取出事件之间会执行多条字节码 
                # 在多线程环境里,很有可能在一个线程里取得了 q[0]
                # 而在另外一个线程里又添加了新的事件,pop 出来就可能是新的事件 
                # 所以这里要验证一下两者是否是同一个对象 
                if event is checked_event:
                    action(*argument)
                    # 主动让出执行权给其它线程 
                    # 如果不加这个,且队列中所有事件都符合调度要求了,
                    # 此时就会一直进行事件调度,
                    # 话是这样说,但是 Python 内部的线程是抢占式,
                    # 不会担心会出现上面所说的问题,
                    # 所以作者在注释才会说明这句代码是一个有争论的 hack 代码 
                    # 如果是用 gevent 的话这句代码就很有必要的,
                    # 因为 gevent 是非抢占式的,要主要出让才行 
                    delayfunc(0)
                else:
                    # 如果验证失败,则放回队列内部,等待下次机会 
                    heapq.heappush(q, event)

    @property
    def queue(self):
        """ 用来保存内部待执行的事件 """
        # 这里使用堆排,而不使用 `sorted` 来对内部队列排序 
        # 是因为当两个事件的优先级一亲,堆排还会按两个事件的插入顺序来排序 
        events = self._queue[:]
        return map(heapq.heappop, [events]*len(events))
Comments
comments powered by Disqus

Published

Sep 24, 2014

Category

Python

Tags

  • python 23
  • stdlibs 15

Contact

  • Powered by Pelican. Theme: Elegant by Talha Mansoor