用Python生成器实现微线程编程的教程

时间:2022-09-04 14:38:14

微线程领域(至少在 Python 中)一直都是 Stackless Python 才能涉及的特殊增强部分。关于 Stackless 的话题以及最近它经历的变化,可能本身就值得开辟一个专栏了。但其中简单的道理就是,在“新的 Stackless”下,延续(continuation)显然是不合时宜的,但微线程还是这个项目 存在的理由。这一点很复杂……

刚开始,我们还是先来回顾一些内容。那么,什么是微线程呢? 微线程基本上可以说是只需要很少的内部资源就可以运行的进程 ― 并且是在 Python 解释器的单个实例中(在公共内存空间中,等等)运行的进程。有了微线程,我们就可能在目前中等性能的 PC 机上运行数以万计的并行进程,还可以每秒钟几十万次地在上下文之间切换。对 fork() 的调用或标准的 OS 线程调用根本不能达到这个程度!甚至所谓的“轻量级”线程库中的线程也比这里提出的微线程“重”好几个数量级。

我在本专栏中介绍的轻便线程的含义与 OS 线程的含义有一点不同。就这点而言,它们与 Stackless 所提供的也不尽相同。在很多方面,轻便线程比大多数变体都简单得多;大多数关于信号、锁定及诸如此类的问题都不存在了。简单性的代价就是,我提出了一种“协作多线程”的形式;我觉得在标准 Python 框架中加入抢占并不可行(至少在非 Stackless 的 Python 2.2 中 — 没有人知道 __future__ 会带来什么)。

轻便线程在某种意义上会令人回想起较早的 Windows 和 MacOS 版本的协作多任务(不过是在单个应用程序中)。然而,在另一种意义上,轻便线程只不过是在程序中表达流的另一种方式;轻便线程所做的一切(至少在原则上)都可以用“真正庞大的 if/elif 块”技术来完成(蛮干的程序员的黔驴之计)。

一种用简单的生成器模拟协同程序的机制。这个机制的核心部分非常简单。 scheduler() 函数中包装了一组生成器对象,这个函数控制将控制流委托给合适的分支的过程。这些协同程序并不是 真正的协同程序,因为它们只控制到 scheduler() 函数和来自该函数的分支。不过出于实用的目的,您可以用非常少的额外代码来完成同样的事情。 scheduler() 就是类似于下面这样的代码:
清单 1. 模拟协同程序的 Scheduler()

?
1
2
3
4
5
def scheduler(gendct, start):
 global cargo
 coroutine = start
 while 1:
  (coroutine, cargo) = gendct[coroutine].next()

关于这个包装器要注意的一点是,每个生成器/协同程序都会生成一个包含它的预期分支目标的元组。生成器/协同程序基本上都在 GOTO 目标处退出。为了方便起见,我还让生成器生成了一个标准的 cargo 容器,作为形式化在协同程序之间传送的数据的方法 — 不过您也可以只用已经达成一致的全局变量或回调 setter/getter 函数来传送数据。Raymond Hettinger 撰写了一个 Python 增强倡议(Python Enhancement Proposal,PEP),旨在使传送的数据能被更好地封装;可能今后的 Python 将包括这个倡议。

新的调度程序

对于轻便线程来说,它们的需求与协同程序的需求稍有不同。不过我们还是可以在它的核心处使用 scheduler() 函数。不同之处在于,调度程序本身应该决定分支目标,而不是从生成器/协同程序接收分支目标。下面让我向您展示一个完整的测试程序和样本:
清单 2. microthreads.py 示例脚本

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from
  
   __future__
  
  import
  
   generators
  
  import
  
   sys, time
threads = []
TOTALSWITCHES =
  
  10**6
NUMTHREADS =
  
  10**5def
  
   null_factory():
 
  
  def
  
   empty():
  
  
  while1:
  
  yield
  
   None
 
  
  return
  
   empty()
  
  def
  
   quitter():
 
  
  for
  
   n
  
  in
  
   xrange(TOTALSWITCHES/NUMTHREADS):
  
  
  yield
  
   None
  
  def
  
   scheduler():
 
  
  global
  
   threads
 
  
  try
  
  :
  
  
  while1:
   
  
  for
  
   thread
  
  in
  
   threads: thread.next()
 
  
  except
  
   StopIteration:
  
  
  passif
  
   __name__ ==
  
  "__main__"
  
  :
 
  
  for
  
   i
  
  in
  
   range(NUMTHREADS):
  threads.append(null_factory())
 threads.append(quitter())
 starttime = time.clock()
 scheduler()
 
  
  print"TOTAL TIME: "
  
  , time.clock()-starttime
 
  
  print"TOTAL SWITCHES:"
  
  , TOTALSWITCHES
 
  
  print"TOTAL THREADS: "
  
  , NUMTHREADS

这大概就是您能够选择的最简单的轻便线程调度程序了。每个线程都按固定顺序进入,而且每个线程都有同样的优先级。接下来,让我们来看看如何处理细节问题。和前面部分所讲的协同程序一样,编写轻便线程时应该遵守一些约定。

处理细节

大多数情况下,轻便线程的生成器都应该包括在 while 1: 循环中。这里设置调度程序的方法将导致在其中一个线程停止时整个调度程序停止。这在某种意义上“健壮性”不如 OS 线程 ― 不过在 scheduler() 的循环 内捕获异常不会比在循环外需要更多的机器资源。而且,我们可以从 threads 列表删除线程,而不必终止(由它本身或其它线程终止)。我们其实并没有提供让删除更加容易的详细方法;不过比较常用的扩展方法可能是将线程存储在字典或某种其它的结构中,而不是列表中。

该示例说明了最后终止调度程序循环的一种合理的方法。 quitter() 是一种特殊的生成器/线程,它监视某种条件(在本示例中只是一个上下文切换的计数),并在条件满足时抛出 StopIteration (本示例中不捕获其它异常)。请注意,在终止之后,其它所有生成器还是完整的,如果需要,还可以在今后恢复(在微线程调度程序或其它程序中)。显然,如果需要,您可以 delete 这些生成器/线程。

这里讨论的示例使用了特殊的无意义线程。它们什么也不做,而且以一种可能性最小的形式实现这一点。我们这样建立该示例是为了说明一点 ― 轻便线程的内在开销是非常低的。在一台比较老的只有 64 MB 内存的 Windows 98 Pentium II 膝上型电脑上创建 100,000 个轻便线程是轻而易举的(如果达到了一百万个线程,就会出现长时间的磁盘“猛转”)。请用 OS 线程试试看! 而且,在这个比较慢的 366 MHz 芯片上可以在大约 10 秒内执行一百万次上下文切换(所涉及的线程数对耗时并无重大影响)。显然,真正的轻便线程应该 做一些事情,而这将根据任务使用更多的资源。不过线程本身却赢得了“轻便”的名声。

切换开销

在轻便线程之间切换开销很小,但还不是完全没有开销。为了测试这种情况,我构建了一个执行 某种工作(不过大约是您在线程中按道理可以完成的最少量)的示例。因为线程调度程序 真的等同于“执行 A,接着执行 B,然后执行 C,等等”的指令,所以要在主函数中创建一个完全并行的情况也不困难。
清单 3. overhead.py 示例脚本

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from
  
   __future__
  
  import
  
   generators
  
  import
  
   time
TIMES = 100000
  
  def
  
   stringops():
 
  
  for
  
   n
  
  in
  
   xrange(TIMES):
  s =
  
  "Mary had a little lamb"
  
  
  s = s.upper()
  s =
  
  "Mary had a little lamb"
  
  
  s = s.lower()
  s =
  
  "Mary had a little lamb"
  
  
  s = s.replace('a','A')
  
  def
  
   scheduler():
 
  
  for
  
   n
  
  in
  
   xrange(TIMES):
  
  
  for
  
   thread
  
  in
  
   threads: thread.next()
  
  def
  
   upper():
 
  
  while1:
  s =
  
  "Mary had a little lamb"
  
  
  s = s.upper()
  
  
  yield
  
   None
  
  def
  
   lower():
 
  
  while1:
  s =
  
  "Mary had a little lamb"
  
  
  s = s.lower()
  
  
  yield
  
   None
  
  def
  
   replace():
 
  
  while1:
  s =
  
  "Mary had a little lamb"
  
  
  s = s.replace(
  
  'a'
  
  ,
  
  'A'
  
  )
  
  
  yield
  
   None
  
  if
  
   __name__==
  
  '__main__':
  
  
 start = time.clock()
 stringops()
 looptime = time.clock()-start
 
  
  print"LOOP TIME:"
  
  , looptime
 
  
  global
  
   threads
 threads.append(upper())
 threads.append(lower())
 threads.append(replace())
 start = time.clock()
 scheduler()
 threadtime = time.clock()-start
 
  
  print"THREAD TIME:"
  
  , threadtime

结果表明,在直接循环的版本运行一次的时间内,轻便线程的版本运行了两次还多一点点 ― 也就相当于在上面提到的机器上,轻便线程运行了不到 3 秒,而直接循环运行了超过 6 秒。显然,如果每个工作单元都相当于单个字符串方法调用的两倍、十倍或一百倍,那么所花费的线程开销比例就相应地更小了。

设计线程

轻便线程可以(而且通常应该)比单独的概念性操作规模更大。无论是何种线程,都是用来表示描述一个特定 任务或 活动所需的流上下文的量。但是,任务花费的时间/大小可能比我们希望在单独线程上下文中使用的要多/大。抢占将自动处理这种问题,不需要应用程序开发者作出任何特定干涉。不幸的是,轻便线程用户需要注意“好好地处理”其它线程。

至少,轻便线程应该设计得足够周全,在完成概念性操作时应该能够 yield 。调度程序将回到这里以进行下一步。举例来说:
清单 4. 伪码友好的轻便线程

def nicethread():
    while 1:
        ...operation A...
        yield None
        ...operation B...
        yield None
        ...operation C...
        yield None

多数情况下,好的设计将比在基本操作之间的边界 yield 更多次。虽然如此,通常在概念上“基本”的东西都涉及对一个大集合的循环。如果情况如此(根据循环体耗费时间的程度),在循环体中加入一到两个 yield (可能在特定数量的循环迭代执行过后再次发生)可能会有所帮助。和优先权线程的情况不同,一个行为不良的轻便线程会获取无限量的独占处理器时间。

调度的其它部分

迄今为止,上面的示例只展示了形式最基本的几个线程调度程序。可能实现的还有很多(这个问题与设计一个好的生成器/线程没什么关系)。让我来顺便向您展示几个传送中可能出现的增强。
更好的线程管理

一个简单的 threads 列表就可以使添加调度程序要处理的生成器/线程非常容易。但是这种数据结构并不能使删除或暂挂不再相关的线程变得容易。字典或类可能是线程管理中更好的数据结构。下面是一个快捷的示例,这个类能够(几乎能够)顺便访问示例中的 threads 列表:
清单 5. 线程管理的 Python 类示例

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
class
    
     ThreadPool:
  
    
    """Enhanced threads list as class
  threads = ThreadPool()
  threads.append(threadfunc) # not generator object
  if threads.query(num) <<has some property>>:
    threads.remove(num)
  """def
    
     __init__(self):
    self.threadlist = []
    self.threaddict = {}
    self.avail =
    
    1def
    
     __getitem__(self, n):
    
    
    return
    
     self.threadlist[n]
  
    
    def
    
     append(self, threadfunc, docstring=None):
    
    
    # Argument is the generator func, not the gen object
# Every threadfunc should contain a docstring
    
    
    docstring = docstring
    
    or
    
     threadfunc.__doc__
    self.threaddict[self.avail] = (docstring, threadfunc())
    self.avail +=
    
    1
    self.threadlist = [p[
    
    1]
    
    for
    
     p
    
    in
    
     self.threaddict.values()]
    
    
    return
    
     self.avail-
    
    1# return the threadIDdef
    
     remove(self, threadID):
    
    
    del
    
     self.threaddict[threadID]
    self.threadlist = [p[
    
    1]
    
    for
    
     p
    
    in
    
     self.threaddict.values()]
  
    
    def
    
     query(self, threadID):
    
    
    "
    
    Information on thread,
    
    if
    
     it exists (otherwise None)
    
    
    return
    
     self.threaddict.get(threadID,[None])[0]

您可以实现更多内容,而这是个好的起点。
线程优先级

在简单的示例中,所有线程都获得调度程序同等的关注。至少有两种普通方法可以实现调优程度更好的线程优先级系统。一个优先级系统可以只对“高优先级”线程投入比低优先级线程更多的注意力。我们可以用一种直接的方式实现它,就是创建一个新类 PriorityThreadPool(ThreadPool) ,这个类在线程迭代期间更频繁地返回更重要的线程。最简单的方法可能会在 .__getitem__() 方法中连续多次返回某些线程。那么,高优先级线程就可能接收到两个,或多个,或一百个连续的“时间片”,而不只是原来的一个。这里的一个(非常弱的)“实时”变量最多可能返回散落在线程列表中各处的重要线程的多个副本。这将增加服务于高优先级线程的实际频率,而不只是它们受到的所有关注。

在纯 Python 中使用更复杂的线程优先级方法可能不是很容易(不过它是使用某种第三方特定于 OS/处理器的库来实现的)。调度程序不是只给高优先级线程一个时间片的整型数,它还可以测量每个轻便线程中实际花费的时间,然后动态调整线程调度,使其对等待处理的线程更加“公平”(也许公平性和线程优先级是相关的)。不幸的是,Python 的 time.clock() 和它的系列都不是精度足够高的计时器,不足以使这种方式有效。另一方面,没有什么可以阻止“多时间片”方法中处理不足的线程去动态提高它自己的优先级。
将微线程和协作程序结合在一起

为了创建一个轻便线程(微线程)调度程序,我删除了协作程序逻辑“please branch to here”。这样做其实并不必要。示例中的轻便线程生成的通常都是 None ,而不是跳转目标。我们完全可以把这两个概念结合在一起:如果协同程序/线程生成了跳转目标,调度程序就可以跳转到被请求的地方(也许,除非被线程优先级覆盖)。然而,如果协同程序/线程只生成 None ,调度程序就可以自己决定下一步要关注哪个适当的线程。决定(以及编写)一个任意的跳转究竟会如何与线性线程队列交互将涉及到不少工作,不过这些工作中没有什么特别神秘的地方。

快速而廉价 — 为什么不喜欢它呢?

微线程模式(或者“轻便线程”)基本上可以归结为 Python 中流控制的另一种奇怪的风格。本专栏的前面几个部分已经谈到了另外几种风格。有各种控制机制的引人之处在于,它让开发者将代码功能性隔离在其逻辑组件内,并最大化代码的上下文相关性。

其实,要实现做任何可能做到的事的 可能性并不复杂(只要用一个“loop”和一个“if”就可以了)。对于轻易地分解为很多细小的“代理”、“服务器”或“进程”的一类问题来说,轻便线程可能是表达应用程序的底层“业务逻辑”的最清楚的模型。当然,轻便线程与一些大家更熟知的流机制相比速度可能非常快,就这点而言并无大碍。