python运维开发(十一)----线程、进程、协程

时间:2023-12-14 22:36:38

内容目录:

  • 线程
    • 基本使用
    • 线程锁
    • 自定义线程池
  • 进程
    • 基本使用
    • 进程锁
    • 进程数据共享
    • 进程池
  • 协程

线程

线程使用的两种方式,一种为我们直接调用thread模块上的方法,另一种我们自定义方式

方式一(常规使用):

import  threading

def f1(arg):
print(arg) t = threading.Thread(target=f1,args=(123,))
t.start()

通过上面的例子我们来了解下该线程执行过程,在执行t.start()时,该进程处于等待状态,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令,当cpu通过指令调用该线程时,线程自动调用执行threading.Thread类的run方法,我们可以自定义run方法的执行过程,因此有了第二种使用方式。

方式二(自定义方式):

import  threading
class MyThread(threading.Thread):
def __init__(self,func,arg):
self.func = func
self.arg = arg
super(MyThread,self).__init__() def run(self):
self.func(self.arg) def f2(arg):
print(arg) obj = MyThread(f2,123)
obj.start()

队列

Python中,队列是线程间最常用的交换数据的形式。

分类

import queue

queue.Queue,先进先出队列

import queue
q = queue.Queue() #创建队列对象,该对象为先进先出对象
q.put(123)
q.put(234)
q.put(345)
print(q.get())
print(q.get())
print(q.get())
#输出:
123
234
345

queue.LifoQueue,后进先出队列

import queue
q = queue.LifoQueue()
q.put(123)
q.put(456)
print(q.get())
print(q.get())
#输出
456
123

queue.PriorityQueue,优先级队列

import queue
q = queue.PriorityQueue()
q.put((4,"priority4"))
q.put((2,"priority2"))
q.put((1,"priority1"))
q.put((3,"priority3"))
print(q.get())
print(q.get())
print(q.get())
#输出
(1, 'priority1')
(2, 'priority2')
(3, 'priority3') 

queue.deque,双向对队

默认pop取队列中的数据,从右边取,先进后出原则,popleft从左取先进后出

import queue
q = queue.deque()
q.append(123)
q.append(333)
q.appendleft(456)
q.appendleft(1111)
print(q.pop())
print(q.pop())
print(q.popleft())
print(q.popleft())
#输出
333
123
1111
456

常用方法:

put放数据,是否阻塞,阻塞时的超时事件

import queue
q = queue.Queue(2)
q.put(123)
q.put(456)
# q.put(234,block=False) #block=False不阻塞直接抛出异常
q.put(234,timeout=2) #timeout=2 阻塞等待2s 抛出异常

get取数据(默认阻塞),是否阻塞,阻塞时的超时事件

qsize()队列中元素的真实个数,返回队列的长度

import queue
q = queue.Queue()
q.put(123)
q.put(234)
q.put(345)
print(q.qsize())
#输出
3

maxsize 设置队列中元素的最大支持的个数

queue.Queue(maxsize)

import queue
q = queue.Queue(3)
q.put(123)
q.put(234)
q.put(345)

上面例子中q中只能存3个元素,多于3个的话会阻塞状态  

join,阻塞进程,当队列中任务执行完毕之后,不再阻塞

task_done,告诉queque当前任务已经完成可以进行下一次的任务

import  queue
q = queue.Queue(5)
q.put(123)
q.put(456) print(q.get())
q.task_done()
print(q.get())
q.task_done()
q.join()
print('end')
#task_done在不和join连用时候,使用和不使用无区别,当和join连用的时候,不加task_done,会一直阻塞状态

  

q.empty()判断队列是否为空,为空返回True,不为空返回False

import queue
q = queue.Queue(2)
print(q.empty()) #判断队列是否为空,为空返回True,不为空返回False
q.put(123)
q.put(456)
print(q.empty())
#输出
True #上面没有put之前为空所返回True
False #put进数据后返回False

block和timeout

import queue
q = queue.Queue(2)
q.put(11)
q.put(22) # q.put(33, block=True,timeout=2) #阻塞状态,等等2s直接抛出异常,默认状态为True
# q.put(33, block=False) #不等待直接抛出异常
print(q.get())
print(q.get())
print(q.get(timeout=2)) #阻塞状态,等等2s直接抛出异常,默认状态为True

线程锁

由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁 - 同一时刻允许一个线程执行操作。

简单实例

未加RLOCK线程锁

import threading
import time NUM = 10
def func(arg): global NUM
NUM -= 1
time.sleep(1)
print(NUM) for i in range(10):
t = threading.Thread(target=func,args=(i,))
t.start()
#输出
0
0
0
0
0
0
0
0
0
0

加RLOCK线程锁

import threading
import time NUM = 10
def func(arg,l): l.acquire()#加锁
global NUM
NUM -= 1
time.sleep(1)
print(NUM)
l.release()#解锁 lock = threading.RLock()
for i in range(10):
t = threading.Thread(target=func,args=(i,lock))
t.start()
#输出:
9
8
7
6
5
4
3
2
1
0

RLOCK和LOCK区别

LOCK只能针对单层级锁

l.acquire()#加锁
global NUM
NUM -= 1
time.sleep(1)
print(NUM)
l.release()#解锁

RLOCK支持多层级加锁

l.acquire()#加锁
global NUM
l.acquire()#加锁
NUM -= 1
time.sleep(1)
l.release()#解锁
print(NUM)
l.release()#解锁 

BoundedSemaphore(信号量)

互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据

import threading
import time
NUM = 10 def func(arg,l):
global NUM l.acquire()#加锁
NUM -= 1
time.sleep(2)
print(NUM,arg)
l.release()#解锁 # lock = threading.RLock()
lock = threading.BoundedSemaphore(5) #最多允许5个线程同时运行
for i in range(10):
t = threading.Thread(target=func,args=(i,lock,))
t.start() #输出:每5个打印一次
5 1
5 0
3 3
3 2
2 4
0 5
0 6
0 8
0 9
0 7

event(事件)

python线程的event的事件的作用主要是主进程控制其他线程的执行。

事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

  • clear:将“Flag”设置为False
  • set:将“Flag”设置为True
def func(i,e):
print(i)
e.wait() # 执行wait方法,阻塞状态等待flag设置为True
print(i+100) event = threading.Event() #创建event对象 for i in range(10):
t = threading.Thread(target=func, args=(i,event,))
t.start() event.clear() # 将flag设置成False
inp = input('>>>')
if inp == "1":
event.set() # 将flag设置为True

condition(条件)

使得线程等待,只有满足某条件时,才释放n个线程

import threading
def func(i,con):
print(i)
con.acquire()#加锁
con.wait()#阻塞状态等待输入,根据输入的条件进行输出,输入为1则输出1个+100后的数字,2的话输出2个相加100后的数字
print(i+100)
con.release()#解锁 c = threading.Condition()
for i in range(10):
t = threading.Thread(target=func, args=(i,c,))
t.start() while True:
inp = input('>>>')
if inp == 'q':
break
c.acquire()
c.notify(int(inp)) #传入值
c.release()

通过输入true or false 来控制输出单个数字

import threading

def condition():
ret = False
r = input('>>>')
if r == 'true':
ret = True
else:
ret = False
return ret def func(i,con):
print(i)
con.acquire()
con.wait_for(condition)
print(i+100)
con.release() c = threading.Condition()
for i in range(10):
t = threading.Thread(target=func, args=(i,c,))
t.start()

true or false

Timmer

定时器,指定n秒后执行某操作

import  threading

def hello():
print("hello, world") t = threading.Timer(1, hello)
t.start() # 1s后输出hello, world

生产者消费者模型

生产者3个人服务消费者300个人

import queue
import threading
import time
q = queue.Queue() def consumer(arg):
"""
买票
:param arg:
:return:
"""
q.put(str(arg) + '- ticket') for i in range(300):
t = threading.Thread(target=consumer,args=(i,))
t.start() def productor(arg):
"""
服务器后台
:param arg:
:return:
"""
while True:
print(arg, q.get())
time.sleep(2) for j in range(3):
t = threading.Thread(target=productor,args=(j,))
t.start()

线程池

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源,所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁。

简单的线程池实现:

import queue
import threading
import time class ThreadPool:
def __init__(self, maxsize=5):
self.maxsize = maxsize
self._q = queue.Queue(maxsize)
for i in range(maxsize):
self._q.put(threading.Thread)
# 【threading.Thread,threading.Thread,threading.Thread,threading.Thread,threading.Thread】
def get_thread(self):
return self._q.get() def add_thread(self):
self._q.put(threading.Thread) pool = ThreadPool(5) def task(arg,p):
print(arg)
time.sleep(1)
p.add_thread() for i in range(100):
# threading.Thread类
t = pool.get_thread()
obj = t(target=task,args=(i,pool,))
obj.start()  

进程

一个简单进程实现,他具有和线程一样的功能方法。

import  multiprocessing

def func(arg):
print(arg) if __name__ == '__main__':
for i in range(10):
t = multiprocessing.Process(target=func,args=(i,))
t.start()
from multiprocessing import Process
from multiprocessing import queues
import multiprocessing def foo(i,arg):
arg.put(i)
print('say hi',i,arg.qsize()) if __name__ == "__main__":
# li = []
li = queues.Queue(20,ctx=multiprocessing)
for i in range(10):
p = Process(target=foo,args=(i,li,))
#p.daemon = True
p.start()
#p.join()

ps:由于进程之间的数据需要各自持有一份,所以创建进程需要的非常大的开销。并且python不能再Windows下创建进程!

默认的进程之间相互是独立,如果想让进程之间数据共享,就得有个特殊的数据结构,这个数据结构就可以理解为他有穿墙的功能

如果你能穿墙的话两边就都可以使用了。

进程数据共享

默认情况下进程间的数据是不能共享的

from multiprocessing import Process
import time li = [] def foo(i):
li.append(i)
print('say hi',li) if __name__ == '__main__':
for i in range(10):
p = Process(target=foo,args=(i,))
p.start()
time.sleep(1)
print('ending',li)#输出列表为空
#输出
say hi [0]
say hi [1]
say hi [7]
say hi [4]
say hi [6]
say hi [2]
say hi [3]
say hi [9]
say hi [5]
say hi [8]
ending []

上例中默认进程之间数据是不共享的,所以每次输出列表中都为1个值,最后输出endind中为空。

 queue队列方式实现进程间数据共享

from multiprocessing import Process
from multiprocessing import queues
import multiprocessing def foo(i,arg):
arg.put(i)
print('say hi',i,arg.qsize())#qsize值每次都自动增加就说明进程间的数据是共享的了 if __name__ == "__main__":
# li = []
li = queues.Queue(20,ctx=multiprocessing)
for i in range(10):
p = Process(target=foo,args=(i,li,))
#p.daemon = True
p.start()
#p.join()
#输出从结果中可以看到进程间的数据是共享的了
say hi 4 4
say hi 3 6
say hi 1 6
say hi 2 6
say hi 6 6
say hi 5 6
say hi 0 7
say hi 9 10
say hi 7 10
say hi 8 10

通过array数组方式实现数据共享

from multiprocessing import Process,Array
import time
#创建一个只包含数字类型的数组(python中叫列表)
#并且数组是不可变的,在C,或其他语言中,数组是不可变的,之后再python中数组(列表)是可以变得
#当然其他语言中也提供可变的数组
#在C语言中数组和字符串是一样的,如果定义一个列表,如果可以增加,那么我需要在你内存地址后面再开辟一块空间,那我给你预留多少呢?
#在python中的list可能用链表来做的,我记录了你前面和后面是谁。 列表不是连续的,数组是连续的 '''
上面不是列表是“数组"数组是不可变的,附加内容是为了更好的理解数组!
''' temp = Array('i', [11,22,33,44]) #这里的i是C语言中的数据结构,通过他来定义你要共享的内容的类型!点进去看~ def Foo(i,arg):
arg[i] = 100+i
for item in arg:
print(i,'----->',item) if __name__ == '__main__':
for i in range(2): #当执行第二个1时候打印1时候的结果为100+0,说明进程间的数据是共享的
p = Process(target=Foo,args=(i,temp,))
p.start()
time.sleep(2)
print('================')
for j in temp: #查看最后temp中的值,是更改过后的
print(j)

 manager.dict实现数据共享

from multiprocessing import Process
from multiprocessing import Manager def foo(i,arg):
arg[i] = i + 100#每次循环进行字典中值进行赋值
print(arg.values()) if __name__ == "__main__":
obj = Manager()
li = obj.dict()
for i in range(10):
p = Process(target=foo,args=(i,li,))
p.start()
p.join() # 方式二 等待子进程执行完毕后主进程继续执行
# 方式一
# import time
# time.sleep(0.1) #等待子进程执行0.1s,然后继续执行主进程
#输出
[100]
[100, 101]
[100, 101, 102]
[100, 101, 102, 103]
[100, 101, 102, 103, 104]
[100, 101, 102, 103, 104, 105]
[100, 101, 102, 103, 104, 105, 106]
[100, 101, 102, 103, 104, 105, 106, 107]
[100, 101, 102, 103, 104, 105, 106, 107, 108]
[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]

进程池

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:

  • apply 去进程池中取进程进行串行操作
  • from multiprocessing import Pool
    import time def f1(arg):
    time.sleep(1)
    print(arg) if __name__ == '__main__':
    pool = Pool(5)
    for i in range(10):
    pool.apply(func=f1,args=(i,)) #apply方式是进入进程池中串行一个一个取来做任务
  • apply_async 取进程并行操作
from multiprocessing import Pool
import time def f1(arg):
time.sleep(1)
print(arg) if __name__ == '__main__':
pool = Pool(5)
for i in range(10):
#pool.apply(func=f1,args=(i,)) #apply方式是进入进程池中串行一个一个取来做任务
pool.apply_async(func=f1,args=(i,)) # pool.close() #此处因为直接执行join会报一个断言的错误,需要满足条件才能继续执行,close和terminate就是这两个条件,
# close表示程序从上到下全部任务执行完毕才终止
time.sleep(2)
pool.terminate() #terminate表示立即中止任务执行,无论是正在运行还是等待运行的,只要主进程执行到terminate子进程就会终止
pool.join() #等待子进程执行完之后在继续执行主进程
print('end')
#输出
0
1
2
3
4
end
#上面的输出5个值,说明主进程执行到terminate时候子进程执行了5个任务了,
#其他的没有执行呢就直接退出

协程

线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。

协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。

协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;

greenlet

使用该模块需要手动安装一下gevent模块 python -m pip install gevent

from greenlet import greenlet

def test1():
print(12)
gr2.switch() #切换到协程2
print(34) #协程2执行完回到这里执行
gr2.switch() #切换到协程2 def test2():
print(56)
gr1.switch()#切换到协程1
print(78) gr1 = greenlet(test1)#创建协程对象
gr2 = greenlet(test2)
gr1.switch() #执行协程1
'''
比I/O操作,如果10个I/O,我程序从上往下执行,如果同时发出去了10个I/O操作,那么返回的结果如果同时回来了2个
,是不是就节省了很多时间? 如果一个线程里面I/O操作特别多,使用协程是不是就非常适用了! 如果一个线程访问URL通过协程来做,协程告诉它你去请求吧,然后继续执行,但是如果不用协程就得等待第一个请求完毕之后返回之后才
继续下一个请求。 协程:把一个线程分成了多个协程操作,每个协程做操作
多线程:是把每一个操作,分为多个线程做操作,但是python中,在同一时刻只能有一个线程操作,并且有上下文切换。但是如果上下文切换非常频繁的话
是非常耗时的,但对于协程切换就非常轻便了~ '''

gevent

import gevent

def foo():
print('Running in foo')
gevent.sleep(0) #切换到bar的任务
print('Explicit context switch to foo again') def bar():
print('Explicit context to bar')
gevent.sleep(0)#切换到foo的任务
print('Implicit context switch back to bar') gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])
#输出
Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar

遇到IO自动 切换

from gevent import monkey; monkey.patch_all()
import gevent
import requests def f(url):
print('GET: %s' % url)
resp = requests.get(url)#当遇到I/O操作的时候就会调用协程操作,然后继续往下走,然后这个协程就卡在这里等待数据的返回
data = resp.text
print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([
gevent.spawn(f, 'https://www.python.org/'),#执行f函数,将后面的url做为参数穿个f
gevent.spawn(f, 'https://www.yahoo.com/'),
gevent.spawn(f, 'https://github.com/'),
])