【python之路36】进程、线程、协程相关

时间:2022-10-19 00:02:56
线程详细用法请参考:http://www.cnblogs.com/sunshuhai/articles/6618894.html

一、初始多线程

通过下面两个例子的运行效率,可以得知多线程的速度比单线程的要快很多

#!usr/bin/env python
# -*- coding:utf-8 -*-
import time
def print_num(arg):
time.sleep(1)
print(arg)
#每秒打印一个数字,速度非常慢
for i in range(10):
print_num(i)
#!usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
def print_num(arg):
time.sleep(1)
print(arg)
#多线程打印速度很快
for i in range(10):
t = threading.Thread(target=print_num, args=(i,))
t.start()

线程是电脑处理事务的最小单元,线程属于进程,每个进程单独开辟的内存空间,进程越多消耗的内存就越大,每个进程内可以创建多个线程,但是CPU调度的时候有线程锁,每个进程只能调度一个线程,所以原则上,计算密集型的用进程,IO密集型的用线程。

当t.start()后,默认是等待线程运行结束后(即默认t.setDaemon(False)),主程序才结束的,如果使用:t.setDaemon(True)则程序直接结束,后台运行线程。

#!usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
def f1():
time.sleep(5)
def f2(f1, num):
print('开始打印....' + num)
f1()
print('结束打印....' + num)
t = threading.Thread(target=f2, args=(f1,''))
t.setDaemon(True) #程序不等待线程运行结束,主程序运行完毕后会直接终止程序
t.start() t1 = threading.Thread(target=f2, args=(f1,''))
t1.setDaemon(True) #程序不等待线程运行结束,主程序运行完毕后会直接终止程序
t1.start() t2 = threading.Thread(target=f2, args=(f1,''))
#t2.setDaemon(True) #程序不等待线程运行结束,主程序运行完毕后会直接终止程序
t2.start()

t.join(2) ,如果参数为空,则默认当改线程执行结束后才继续向后执行,如果参数为2,则最多等待2秒,如果1秒运行完,则1秒后继续运行,如果两秒内未运行完毕,则也不再等待,主程序继续向下运行,原来的线程后台继续运行。如下面程序运行4秒后会直接结束:

#!usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
def f1():
time.sleep(100)
print('程序结束....') t = threading.Thread(target=f1)
t.setDaemon(True)
t.start()
t.join(2) #最多等待2秒,后继续向下运行 t1 = threading.Thread(target=f1)
t1.setDaemon(True)
t1.start()
t1.join(2) #最多等待2秒,后继续向下运行

二、线程锁,rlock,如果有一个变量num = 0,多个线程去让这个变量自增1,那么可能两个线程同时去拿num的初始值,都自增了1。所以就需要线程锁,只有一条线程能处理这个变量,其他线程等待,等这个线程处理完毕后,其他线程才能处理。

#!usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time lock = threading.RLock() #创建线程锁
num = 0
def func():
lock.acquire() #获得锁
global num
num += 1
time.sleep(1)
print(num)
lock.release() #释放锁 t = threading.Thread(target=func)
t.start() t1 = threading.Thread(target=func)
t1.start()
#结果1秒钟输出1,第2秒钟输出2

三、线程事件,threading.Event

Event是线程间通信最间的机制之一:一个线程发送一个event信号,其他的线程则等待这个信号。用于主线程控制其他线程的执行。 Events 管理一个flag,这个flag可以使用set()设置成True或者使用clear()重置为False,wait()则用于阻塞,在flag为True之前。flag默认为False。

Event.wait([timeout]) : 堵塞线程,直到Event对象内部标识位被设为True或超时(如果提供了参数timeout)。

Event.set() :将标识位设为Ture

Event.clear() : 将标识伴设为False。

Event.isSet() :判断标识位是否为Ture。

其实就相当于Event.wait()就相当于遇到了红灯,当用Event.set()设置为True时所有的线程暂停在这,一旦用Event.clear()设置为False,相当于遇到了绿灯,所有线程继续暂停的地方继续向下运行

#!usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
obj_event = threading.Event()
def func(event):
print('start')
event.wait()
print('end') obj_event.clear() #标识位设置为True相当于打开红灯
for i in range(10):
t = threading.Thread(target=func, args=(obj_event,))
t.start() inp = input('请输入False改变标识位:')
if inp == 'False':
obj_event.set() ##标识位设置为False相当于打开绿灯

 四、生产者消费者模型及队列queue

举例来说,卖包子,厨师做好包子后推进管道内,顾客从另一端取走,这个模型叫生产消费者模型,优点是:厨师可以不断生产,相当于缓存,常用方法:

put  get  get_nowait不等待

import queue

q = queue.Queue(maxsize=0)  # 构造一个先进显出队列,maxsize指定队列长度,为0 时,表示队列长度无限制。

q.join()    # 等到队列为kong的时候,在执行别的操作
q.qsize() # 返回队列的大小 (不可靠,因为队列时刻在发生变化)
q.empty() # 当队列为空的时候,返回True 否则返回False (不可靠,因为队列时刻在发生变化)
q.full() # 当队列满的时候,返回True,否则返回False (不可靠,因为队列时刻在发生变化)
q.put(item, block=True, timeout=None) # 将item放入Queue尾部,item必须存在,可以参数block默认为True,表示当队列满时,会等待队列给出可用位置,
                      #为False时为非阻塞,此时如果队列已满,会引发queue.Full 异常。 可选参数timeout,表示 会阻塞设置的时间,过后,
                      #如果队列无法给出放入item的位置,则引发 queue.Full 异常
q.get(block=True, timeout=None) # 移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,如果队列为空,则阻塞,为False时,不阻塞,
                      若此时队列为空,则引发 queue.Empty异常。 可选参数timeout,表示会阻塞设置的时候,过后,如果队列为空,则引发Empty异常。
q.put_nowait(item) # 等效于 put(item,block=False)
q.get_nowait() # 等效于 get(item,block=False)
#!usr/bin/env python
# -*- coding:utf-8 -*-
import queue
import threading q = queue.Queue(10)
def product(i):
q.put(i)
print(i)
def customer(i):
q.get(i)
print(i) for i in range(12):
t = threading.Thread(target=product, args=(i,))
t.start()
for i in range(9):
t1 = threading.Thread(target=customer, args=(i,))
t1.start()

以后写程序的时候需要创建多少线程呢?例如有5000个任务,那么就需要创建5000个线程么,答案肯定不是的。例如你的电脑开10个线程是最优的,那么每次最多开10个线程。这就引入线程池的概念,最多创建10个线程,如果大于10个线程,那么先处理10个线程,哪个线程处理完毕后,再创建一个线程继续处理下一个任务。

python内部没有提供线程池,需要我们自定义线程池,以后使用中很少自己创建线程,一般都使用线程池。

五、进程相关multiprocessing

在windows中是不支持进程的,要想使用进程必须加到if __name__ =="__main__":里面,否则会报错,但不可能所有的程序都加这个,所以建议进程程序在linux下进行调试.

代码从上向下解释是主进程中的主线程做的。

1、创建进程

#!usr/bin/env python
# -*- coding:utf-8 -*-
import multiprocessing
import time
def func(i):
time.sleep(15)
print(i) if __name__ == "__main__":
p = multiprocessing.Process(target=func, args=(11,))
p.daemon = True #True表示不等待直接结束主程序;默认为False,等待进程中的线程运行完毕才继续运行
p.start()
p.join(2) #表示最多等待2秒

2、进程相当于是重新创造一个房间,房间内的资源全部重新创建一份,即在内存中重新开辟一块内存,其所有变量及对象重新创建一份。

#!usr/bin/env python
# -*- coding:utf-8 -*-
import multiprocessing
li = []
def func(i):
li.append(i)
print(li) if __name__ == "__main__":
for i in range(10):
p = multiprocessing.Process(target=func, args=(i,))
p.start() #结果输出如下:
# [1]
# [0]
# [2]
# [3]
# [4]
# [6]
# [7]
# [5]
# [8]
# [9]

3、虽然默认进程之间的数据是不共享的,但可以通过创建数据类型,来实现数据共享

(1)数组的Array特性:数组的类型必须指定,数组的个数指定后不能超出范围

此方法因为限制太多一般不用

#!usr/bin/env python
# -*- coding:utf-8 -*- import multiprocessing def Foo(i,temp):
temp[i] = 100 + i
print(list(temp)) if __name__ == "__main__":
temp = multiprocessing.Array('i', [11, 22, 33, 44])
for i in range(4):
p = multiprocessing.Process(target=Foo, args=(i,temp))
p.start()

(2)通过创建特殊的字典Manager,来实现不同进程之间的数据共享,此字典和python普通的字典几乎是一样的(能取值和赋值),但也有区别

#!usr/bin/env python
# -*- coding:utf-8 -*- import multiprocessing
import time
def Foo(i,dic):
dic[i] = 100 + i
print(len(dic)) if __name__ == "__main__":
m = multiprocessing.Manager()
dic = m.dict()
for item in range(3):
p = multiprocessing.Process(target=Foo, args=(item,dic,))
p.start()
#p.join()
#进程之间数据共享,基层是通过进程之间数据连接实现的,所以如果主进程执行到最后,等待子进程结束,
#当主进程hold住的时候(即执行到最后),其自进程就无法进行数据共享了,此时用join,或sleep可以解决
time.sleep(3)

4、进程池Pool

进程池主要有两个方法:

区别:apply和apply_async,apply是每次只能一个进程进行工作每一个任务都是排队进行,相当于进程.join()

apply_async通过异步的方式,每次进程可以同时工作,并发进行,可以设置回调函数

#!usr/bin/env python
# -*- coding:utf-8 -*-
import multiprocessing
import time
def func(args):
time.sleep(1)
print(args + 100)
return(args + 100)
def f1(args):
print(args)
if __name__ == "__main__":
pool = multiprocessing.Pool(5)
for i in range(50):
# pool.apply(func, args=(i,)) #逐个进程运行,因为apply只支持一个进程同时工作
pool.apply_async(func, args=(i,),callback=f1) #同时运行5个进程,异步操作
#注意上面参数callback是回调函数,调用完func后,再调用f1函数并将func函数的返回值作为参数传给f1
print('') #先打印50个11说明for循环50次是全部执行完毕的,然后写入进程等待
pool.close() #在进程池.join之前必须加pool.close()-等待进程池所有进程处理完毕或pool.terminate()不等待直接结束
pool.join()#此句表示进程池中的进程执行完毕后再关闭,如果注释,那么程序直接关闭

六、自定义线程池

1、自定义low版线程池

#!usr/bin/env python
# -*- coding:utf-8 -*-
import queue
import time
from threading import Thread class Thread_Pool:
def __init__(self, maxsize):
self.que = queue.Queue(maxsize) #定义队列赋值给对象的变量que
for i in range(maxsize): #循环将线程类放进队列
self.que.put(Thread)
def get_thread(self):
return self.que.get()
def put_thread(self):
self.que.put(Thread)
def func(t,args):
time.sleep(1)
print(args)
t.put_thread() t = Thread_Pool(5)
for i in range(50):
Thread_class = t.get_thread()
thread = Thread_class(target=func, args=(t,i)) #把t线程池对象传给func,以便拿走线程后随时给线程池补充线程
thread.start()

2、low版的线程池与进程池相比有几个问题:

1、线程池队列中不管用不用,线程类都被创建,可能存在大于5个线程的情况,因为,拿走1个线程,运行任务,然后增加1个线程,可能拿走的线程没有运行完毕

2、需要手动得到线程类,然后再根据线程类创建线程对象,最后还需要运行线程对象

3、没有回调函数

4、线程池补充是手动补回的

绝版高大上线程,可以直接作为线程池模块使用:

#!usr/bin/env python
# -*- coding:utf-8 -*- import queue
import threading
import time
StopEvent = object()
class TreadPool:
def __init__(self, max_num, max_tast_num = 0):
self.max_num = max_num #最大线程数
if max_tast_num: #根据是否制定最大任务数来指定队列程度
self.q = queue.Queue() #队列不限定长度
else:
self.q = queue.Queue(max_tast_num) #根据用户指定长度创建队列
self.generate_list = [] #记录生成的线程
self.free_list = [] #记录空闲的线程
self.terminal = False
def run(self, target, args, callback=None):
"""运行该函数,调用线程池"""
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
#没有空闲线程并且当前创建线程数量小于最大允许线程数量时允许创建线程
self.creat_thread() #调用创建线程函数
tast = (target, args, callback) #将任务打包成为元组放入队列
self.q.put(tast) def creat_thread(self):
"""创建线程,并且运行,这时调用call函数,所有实现均在call函数内"""
thread = threading.Thread(target=self.call)
thread.start()
def call(self):
"""线程调用该函数"""
current_thread = threading.currentThread() #获取执行该函数的当前线程
self.generate_list.append(current_thread) #将线程加入生成的线程列表 tast = self.q.get() #从队列中取出一个任务包 while tast != StopEvent:
target, args, backcall = tast #将元组人物包,赋值给变量
try:
result = target(*args) #执行函数,并将返回值赋值给result
except Exception as e:
result = None if backcall:
try:
backcall(result) #执行回调函数,并将result作为参数传给回调函数
except Exception as e:
pass self.free_list.append(current_thread) #执行完毕,将当前线程对象加入空闲列表
if self.terminal: #是否强制终止
tast = StopEvent
else:
tast = self.q.get() #等待那任务,如果有任务直接循环执行,如果没有则等待,一旦run方法中放入任务则继续执行任务,无需再创建线程
self.free_list.remove(current_thread) #拿到任务后,清除空闲线程
else:
self.generate_list.remove(current_thread)
def close(self):
"""所有线程全部运行完毕后,停止线程
call函数运行完毕后,所有的线程此时都在等待拿任务,此时,只要给队列里面添加StopEvent对象则线程就会结束"""
generate_size = len(self.generate_list)
while generate_size:
self.q.put(StopEvent)
generate_size -= 1
def terminate(self):
"""不等待线程全部运行完毕任务,直接终止"""
self.terminal = True #正在运行的线程运行完毕后会终止
generate_size = len(self.generate_list)
while generate_size: #终止已经在等待的那一部分线程
self.q.put(StopEvent)
generate_size -= 1 def func(i):
time.sleep(1)
print(i) pool = TreadPool(5)
for i in range(100):
pool.run(target=func, args=(i,))
pool.close()

七、协程

什么是协程呢,举个例子:

从网站向下抓图片,请求发出后,2秒后能够返回请求,网站一共100张图片,如果用5个线程,那么这5个线程中2秒共请求5张图片。

如果用一个线程,请求后继续请求下张图片,请求返回后接收图片,那么可能0.01秒就能请求一个,这用可以少用线程,并且效率也很高。

1、协程的基础greenlet

首先用 pip3 install greenlet  安装greelet模块,然后测试下面代码

#!usr/bin/env python
# -*- coding:utf-8 -*- from greenlet import greenlet
def f1():
print(12)
gr2.switch()
print(34)
gr2.switch() def f2():
print(45)
gr1.switch()
print(56) gr1 = greenlet(f1)
gr2 = greenlet(f2) gr1.switch() #遇到swith切换到f1函数 #输出结果为:
#
#
#
#

2、gevent是基于greenlet进行封装的协程处理模块,系统自动的进行控制,如下面代码

通过:pip3 install gevent 导入gevent模块。

gevent当遇到运行较慢时自动会执行下一个任务,当执行完毕后会回来继续执行。

#!usr/bin/env python
# -*- coding:utf-8 -*- import gevent
def foo():
print('Runing in foo')
gevent.sleep(0) #切换为下一个任务
print('Explicit context switch to foo again')
def bar():
print('Explicit context to bar')
gevent.sleep(0)
print('Implicit context switch to bar') gevent.joinall([gevent.spawn(foo),
gevent.spawn(bar)]) #运行结果如下:
# Runing in foo
# Explicit context to bar
# Explicit context switch to foo again
# Implicit context switch to bar

爬虫应用gevent,最主要的应用场景:

#!usr/bin/env python
# -*- coding:utf-8 -*-
from gevent import monkey;monkey.patch_all()
import gevent
import requests
def f(url):
print('GET:%s' % url)
resp = requests.get(url)
data = resp.text
print(url, len(data)) gevent.joinall([gevent.spawn(f, 'https://www.baidu.com/'),
gevent.spawn(f,'https://www.yahoo.com/'),
gevent.spawn(f,'https://taobao.com/'),
gevent.spawn(f,'http://www.people.com.cn/')]) #返回结果:
# GET:https://www.baidu.com/
# GET:https://www.yahoo.com/
# GET:https://taobao.com/
# GET:http://www.people.com.cn/
# http://www.people.com.cn/ 135707
# https://www.baidu.com/ 2443
# https://taobao.com/ 134324
# https://www.yahoo.com/ 416613