一、线程
1、创建线程
2、主线程是否等待子线程
t.setDaemon(Ture/False):默认是false,等待子线程完成,ture,表示不等待子线程结束
3、主线程等待,子线程执行
join(),一直等到子线程结束
join(3),最多等待3秒,如果子线程需要两秒,则等待2秒。
4、线程锁
R.rlock()
#!/usr/bin/env python
#coding:utf-8 import threading
import time gl_num = 0 lock = threading.RLock() def Func():
lock.acquire()
global gl_num
gl_num +=1
time.sleep(1)
print gl_num
lock.release() for i in range(10):
t = threading.Thread(target=Func)
t.start()
线程锁
5、线程事件
#!/usr/bin/env python
# -*- coding:utf-8 -*- import threading def do(event):
print 'start'
event.wait()
print 'execute' event_obj = threading.Event()
for i in range(10):
t = threading.Thread(target=do, args=(event_obj,))
t.start() event_obj.clear()
inp = raw_input('input:')
if inp == 'true':
event_obj.set()
线程事件
6、线程池
python内部没有提供
需要自定义
二、生产者消费者模型及队列
三、进程
1、创建进程
2、daemon
默认false,歘歘
3、jion()等待
4、进程之间数据不能共享
#!/usr/bin/env python
#coding:utf-8 from multiprocessing import Process
from multiprocessing import Manager import time li = [] def foo(i):
li.append(i)
print 'say hi',li for i in range(10):
p = Process(target=foo,args=(i,))
p.start() print 'ending',li
进程之间数据不共享
······线程之间数据是共享的·············
·············进程数据不能共享(默认)············
~~~~~~~~~~~~~~~~~~~~~~~~~进程之间数据共享~~~~~~~~~~~~~~~~~~~~
#方法一,Array
from multiprocessing import Process,Array
temp = Array('i', [11,22,33,44]) def Foo(i):
temp[i] = 100+i
for item in temp:
print i,'----->',item for i in range(2):
p = Process(target=Foo,args=(i,))
p.start() #方法二:manage.dict()共享数据
from multiprocessing import Process,Manager manage = Manager()
dic = manage.dict() def Foo(i):
dic[i] = 100+i
print dic.values() for i in range(2):
p = Process(target=Foo,args=(i,))
p.start()
p.join()
进程数据共享py2.7(两个方法)
5、进程池
p = Pool(5)
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process,Pool
import time def Foo(i):
time.sleep(2)
return i+100 def Bar(arg):
print arg pool = Pool(5)
#print pool.apply(Foo,(1,))
#print pool.apply_async(func =Foo, args=(1,)).get() for i in range(10):
pool.apply_async(func=Foo, args=(i,),callback=Bar) print 'end'
pool.close()
pool.join()#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
进程池
~~~~~~~~~~~~进程池基础之apply和apply_async方法区别~~~~~~~~~~~~~~~~
p.apply() 每一个任务是排队进行,进程.join()
p.apply_async() 每一个任务并发进行,可以设置回调函数,进程无.join(),daemon=True
四、线程池的实现
1、低配版线程池
2、高配版线程池
(1)、设计思路
复制代码 #!/usr/bin/env python
# -*- coding:utf-8 -*- import queue
import threading
import contextlib
import time StopEvent = object() class ThreadPool(object): def __init__(self, max_num, max_task_num = None):
if max_task_num:
self.q = queue.Queue(max_task_num)
else:
self.q = queue.Queue()
self.max_num = max_num
self.cancel = False
self.terminal = False
self.generate_list = []
self.free_list = [] def run(self, func, args, callback=None):
"""
线程池执行一个任务
:param func: 任务函数
:param args: 任务函数所需参数
:param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
:return: 如果线程池已经终止,则返回True否则None
"""
if self.cancel:
return
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
self.generate_thread()
w = (func, args, callback,)
self.q.put(w) def generate_thread(self):
"""
创建一个线程
"""
t = threading.Thread(target=self.call)
t.start() def call(self):
"""
循环去获取任务函数并执行任务函数
"""
current_thread = threading.currentThread()
self.generate_list.append(current_thread) event = self.q.get()
while event != StopEvent: func, arguments, callback = event
try:
result = func(*arguments)
success = True
except Exception as e:
success = False
result = None if callback is not None:
try:
callback(success, result)
except Exception as e:
pass with self.worker_state(self.free_list, current_thread):
if self.terminal:
event = StopEvent
else:
event = self.q.get()
else: self.generate_list.remove(current_thread) def close(self):
"""
执行完所有的任务后,所有线程停止
"""
self.cancel = True
full_size = len(self.generate_list)
while full_size:
self.q.put(StopEvent)
full_size -= 1 def terminate(self):
"""
无论是否还有任务,终止线程
"""
self.terminal = True while self.generate_list:
self.q.put(StopEvent) self.q.queue.clear() @contextlib.contextmanager
def worker_state(self, state_list, worker_thread):
"""
用于记录线程中正在等待的线程数
"""
state_list.append(worker_thread)
try:
yield
finally:
state_list.remove(worker_thread) # How to use pool = ThreadPool(5) def callback(status, result):
# status, execute action status
# result, execute action return value
pass def action(i):
print(i) for i in range(30):
ret = pool.run(action, (i,), callback) time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
# pool.close()
# pool.terminate()
2、上下文管理基础
3、上下文管理之with自定义open
#!/usr/bin/env python
# -*- coding:utf-8 -*- import queue
import threading
import contextlib
import time StopEvent = object() class ThreadPool(object): def __init__(self, max_num, max_task_num = None):
if max_task_num:
self.q = queue.Queue(max_task_num)
else:
self.q = queue.Queue()
# 最多创建的线程数(线程池最大容量)
self.max_num = max_num
self.cancel = False
self.terminal = False
# 真实创建的线程列表
self.generate_list = []
# 空闲线程数量
self.free_list = [] def run(self, func, args, callback=None):
"""
线程池执行一个任务
:param func: 任务函数
:param args: 任务函数所需参数
:param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
:return: 如果线程池已经终止,则返回True否则None
"""
w = (func, args, callback,)
self.q.put(w)
# 把任务放在一个元组里 if self.cancel:
return
# 如果没有空闲线程,且创建的线程数目小于线程池最大创建数目
# 则创建线程
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
self.generate_thread() def generate_thread(self):
"""
创建一个线程
"""
t = threading.Thread(target=self.call)
t.start() def call(self):
"""
循环去获取任务函数并执行任务函数
"""
# 获取当前线程
current_thread = threading.currentThread()
self.generate_list.append(current_thread)
# 去任务并执行
event = self.q.get()
while event != StopEvent:
# 是任务
# 解开任务包
func, arguments, callback = event
# 执行任务
try:
result = func(*arguments)
success = True
except Exception as e:
success = False
result = e if callback is not None:
try:
callback(success, result)
except Exception as e:
pass
# 标记 空闲了
with self.worker_state(self.free_list, current_thread):
if self.terminal:
event = StopEvent
else:
# 取任务
event = self.q.get()
else:
# 不是元组,不是任务
self.generate_list.remove(current_thread)
# 想要终止
# 1、让正在从队列中取任务的线程挂掉
# 2、主线程,你跳我就跳 def close(self):
"""
执行完所有的任务后,所有线程停止
"""
self.cancel = True
full_size = len(self.generate_list)
while full_size:
self.q.put(StopEvent) # 给队列加终止符,有几个加几个
full_size -= 1 def terminate(self):
"""
无论是否还有任务,终止线程
"""
self.terminal = True while self.generate_list:
self.q.put(StopEvent) self.q.queue.clear() @contextlib.contextmanager # 装饰器 处理上下文
def worker_state(self, state_list, worker_thread):
"""
用于记录线程中正在等待的线程数
"""
state_list.append(worker_thread)
try:
yield
finally:
state_list.remove(worker_thread) pool = ThreadPool(5) def callback(status, result):
# status, execute action status
# result, execute action return value
pass def action(i):
print(i) for i in range(30):
# 将任务放在队列中
# 着手开始处理任务
# -创建线程
# -有空闲线程,则不再创建线程
# -不能高于线程池的限制
# -根据任务个数判断 # -线程去队列中取任务
ret = pool.run(action, (i,), callback) time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
pool.close()
pool.terminate()
最终代码
五、 协程
greenlet
#!/usr/bin/env python
# -*- coding:utf-8 -*- from greenlet import greenlet def test1():
print 12
gr2.switch()
print 34
gr2.switch() def test2():
print 56
gr1.switch()
print 78 gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
import gevent def foo():
print('Running in foo')
gevent.sleep(0)
print('Explicit context switch to foo again') def bar():
print('Explicit context to bar')
gevent.sleep(0)
print('Implicit context switch back to bar') gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])
遇到io操作自动切换:
from gevent import monkey; monkey.patch_all()
import gevent
import urllib2 def f(url):
print('GET: %s' % url)
resp = urllib2.urlopen(url)
data = resp.read()
print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([
gevent.spawn(f, 'https://www.python.org/'),
gevent.spawn(f, 'https://www.yahoo.com/'),
gevent.spawn(f, 'https://github.com/'),
])