并发编程——多线程(4)

时间:2021-12-09 18:02:36

1.线程理论

  • 线程是CPU的执行单位
  • 多线程(即多个控制线程)的概念是,在一个进程中存在多个线程,多个线程共享该进程的地址空间,相当于一个车间内又多条流水线,都共用一个车间的资源。例如,北京地铁与上海地铁是不同的进程,而北京地铁里的13号线是一个线程,北京地铁所有的线路共享北京地铁所有的资源,比如所有的乘客可以被所有线路拉。

2.线程与进程的区别

  • 同一个进程内的多个线程共享该进程内的地址资源
  • 创建线程的开销要远小于创建进程的开销(创建一个进程,就是创建一个车间,涉及到申请空间,而且在该空间内建至少一条流水线,但创建线程,就只是在一个车间内造一条流水线,无需申请空间,所以创建开销小)

3.开启线程的两种方式

  • 方式一:函数
  • 并发编程——多线程(4)并发编程——多线程(4)
     1 #-*- coding:utf-8 -*-
    2 from threading import Thread
    3 import time
    4 import os
    5 def sayhi(name):
    6 time.sleep(2)
    7 print("%s say hello"%name)
    8 if __name__ == "__main__":
    9 t = Thread(target=sayhi,args=('egon',))
    10 t.start()
    11 print("")
    方式一
  • 方式二:类
  • 并发编程——多线程(4)并发编程——多线程(4)
     1 class SayHi(Thread):
    2 def __init__(self,name):
    3 super().__init__()
    4 self.name = name
    5 def run(self):
    6 print("%s say hello" % self.name)
    7 print("线程pid:",os.getpid())
    8 if __name__ =="__main__":
    9 t1 = SayHi("egon")
    10 t2 = SayHi("alex")
    11 t1.start()
    12 t2.start()
    13 print("主进程pid:",os.getpid())
    方式二

4.多线程与多进程的区别

  •  开启速度:线程快于进程
  • pid:同一进程下不同线程的pid是相同的,进程之间的pid是不同的
  • 内存:进程之间内存地址空间是隔离的,而同一进程内开启的多个线程是共享该进程内存地址空间的

 5.Thread对象的其他属性或方法

  • Thread实例对象的方法
    • isAlive():返回线程是否活动的
    • getName():返回线程名
    • setName():设置线程名
  • threading模块提供的一些方法
    • threading.currentThread():返回当前的线程变量
    • threading.enumerate():返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前后终止后的线程
    • threading.activeCount():返回正在运行的线程数量,与len(threading.enumerate())有相同的结果
  • 并发编程——多线程(4)并发编程——多线程(4)
     1 #-*- coding:utf-8 -*-
    2 from threading import Thread
    3 import time
    4 import threading
    5 import os
    6
    7 def sayhi(name):
    8 time.sleep(2)
    9 print("%s say hello"%name)
    10 print(threading.current_thread().getName())#Thread-1
    11 if __name__ == "__main__":
    12 t = Thread(target=sayhi,args=('egon',))
    13 t.start()
    14 print(threading.current_thread().getName())#MainThread
    15 print(threading.current_thread())#<_MainThread(MainThread, started 8308)>
    16 print(threading.enumerate())#[<_MainThread(MainThread, started 9072)>, <Thread(Thread-1, started 7052)>]
    17 print(threading.active_count())#2
    18 print(t.is_alive())#True
    19 t.join()
    20 print('主线程/主进程')
    21 print(t.is_alive())#False
    View Code

5.守护线程

  • 无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁
    • 强调:运行完毕并非终止运行
    • 对于主进程来说,运行完毕指的是主进程代码运行完毕
    • 对于主线程来说,运行完毕指的是主线程所在进程内所有非守护线程统统运行完毕,主线程才算运行完毕
  • 守护线程:在同一进程内,其他非守护线程运行完毕后才算运行完毕,此时守护线程被回收
  • 并发编程——多线程(4)并发编程——多线程(4)
     1 #-*- coding:utf-8 -*-
    2 from threading import Thread
    3 import time
    4
    5 def walk():
    6 print("start123")
    7 time.sleep(1)
    8 print("end123")
    9 def run():
    10 print("start456")
    11 time.sleep(3)
    12 print("end456")
    13 if __name__ == "__main__":
    14 t1 = Thread(target=walk)
    15 t2 = Thread(target=run)
    16 t1.daemon = True
    17 t1.start()
    18 t2.start()
    19 print("")
    20
    21
    22 #start123
    23 #start456
    24 #
    25 #end123
    26 #end456
    View Code
  • 并发编程——多线程(4)并发编程——多线程(4)
     1 #-*- coding:utf-8 -*-
    2 from threading import Thread
    3 import time
    4
    5 def walk():
    6 print("start123")
    7 time.sleep(3)
    8 print("end123")
    9 def run():
    10 print("start456")
    11 time.sleep(1)
    12 print("end456")
    13 if __name__ == "__main__":
    14 t1 = Thread(target=walk)
    15 t2 = Thread(target=run)
    16 t1.daemon = True
    17 t1.start()
    18 t2.start()
    19 print("")
    20 #start123
    21 #start456
    22 #
    23 #end456
    View Code

6.GIL全局解释器锁

  • 本质上也是互斥锁
  • 保护不同的数据应该加不同的锁,GIL是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据);lock是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自己加锁定义
  • 有了GIL的存在,同一时刻同一进程中只能有一个线程被执行
  • GIL与多线程
    • ·对于计算来说,CPU越多越好,但是对于I/O来说,再多的CPU也没用
    • 对于单核计算机来说,常采用开启一个进程,多个线程的方案
    • 对于多核计算机来说,如果任务是计算密集型,多核意味着并行计算,多进程方案更优;如果任务是I/O密集型,则多线程方案更优
    • 多线程用于IO密集型,如socket,爬虫,web
    • 多进程用于计算密集型,如金融分析
    • 并发编程——多线程(4)并发编程——多线程(4)
       1 #-*- coding:utf-8 -*-
      2 from multiprocessing import Process
      3 from threading import Thread
      4 import os
      5 import time
      6 def work():
      7 time.sleep(2)
      8 print("===>")
      9 if __name__ == "__main__":
      10 l_p = []
      11 print(os.cpu_count())
      12 start = time.time()
      13 for i in range(400):
      14 # p = Process(target=work)#耗时时间长
      15 p = Thread(target=work)#耗时时间短
      16 l_p.append(p)
      17 p.start()
      18 for p in l_p:
      19 p.join()
      20 stop = time.time()
      21 print("run time is %s"%(stop-start))
      I/O密集型
      并发编程——多线程(4)并发编程——多线程(4)
       1 #-*- coding:utf-8 -*-
      2 from multiprocessing import Process
      3 from threading import Thread
      4 import os
      5 import time
      6 def work():
      7 res = 0
      8 for i in range(100000000):
      9 res*=1
      10 if __name__ == "__main__":
      11 l_p = []
      12 print(os.cpu_count())
      13 start = time.time()
      14 for i in range(4):
      15 # p = Process(target=work)#耗时时间短
      16 p = Thread(target=work)#耗时时间长
      17 l_p.append(p)
      18 p.start()
      19 for p in l_p:
      20 p.join()
      21 stop = time.time()
      22 print("run time is %s"%(stop-start))
      计算密集型

7.死锁现象与递归锁RLOCK

  • 死锁现象
    •  是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁
    • 并发编程——多线程(4)并发编程——多线程(4)
       1 # -*- coding:utf-8 -*-
      2 from threading import Thread, Lock
      3 import time
      4
      5 mutexA = Lock()
      6 mutexB = Lock()
      7
      8
      9 class MyThread(Thread):
      10 def run(self):
      11 self.func1()
      12 self.func2()
      13
      14 def func1(self):
      15 mutexA.acquire()
      16 print('\033[41m%s 拿到A锁\033[0m' % self.name)
      17 mutexB.acquire()
      18 print('\033[42m%s 拿到B锁\033[0m' % self.name)
      19 mutexB.release()
      20 mutexA.release()
      21 def func2(self):
      22 mutexB.acquire()
      23 print('\033[43m%s 拿到B锁\033[0m' % self.name)
      24 time.sleep(2)
      25 mutexA.acquire()
      26 print('\033[44m%s 拿到A锁\033[0m' % self.name)
      27 mutexA.release()
      28 mutexB.release()
      29 if __name__ == "__main__":
      30 for i in range(10):
      31 t = MyThread()
      32 t.start()
      死锁
  • 递归锁
    • 递归锁RLOCK用于解决死锁现象
    • 这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁,
    • 二者的区别
      • 递归锁可以连续acquire多次,
      • 互斥锁只能acquire一次
    • 并发编程——多线程(4)并发编程——多线程(4)
       1 # -*- coding:utf-8 -*-
      2 from threading import Thread, Lock, RLock
      3 import time
      4
      5 mutexA = mutexB = RLock()
      6
      7
      8 class MyThread(Thread):
      9 def run(self):
      10 self.func1()
      11 self.func2()
      12
      13 def func1(self):
      14 mutexA.acquire()
      15 print('\033[41m%s 拿到A锁\033[0m' % self.name)
      16 mutexB.acquire()
      17 print('\033[42m%s 拿到B锁\033[0m' % self.name)
      18 mutexB.release()
      19 mutexA.release()
      20
      21 def func2(self):
      22 mutexB.acquire()
      23 print('\033[43m%s 拿到B锁\033[0m' % self.name)
      24 time.sleep(2)
      25 mutexA.acquire()
      26 print('\033[44m%s 拿到A锁\033[0m' % self.name)
      27 mutexA.release()
      28 mutexB.release()
      29
      30
      31 if __name__ == "__main__":
      32 for i in range(10):
      33 t = MyThread()
      34 t.start()
      递归锁

8.信号量

  • 信号量也是一把锁,可以指定信号量为5,对比互斥锁同一时间只能有一个任务抢到锁去执行,信号量同一时间可以有5个任务拿到锁去执行,如果说互斥锁是合租房屋的人去抢一个厕所,那么信号量就相当于一群路人争抢公共厕所,公共厕所有多个坑位,这意味着同一时间可以有多个人上公共厕所,但公共厕所容纳的人数是一定的,这便是信号量的大小
  • 并发编程——多线程(4)并发编程——多线程(4)
     1 #-*- coding:utf-8 -*-
    2 from threading import Thread,Semaphore
    3 import threading
    4 import time
    5 def func():
    6 sm.acquire()
    7 print('%s get sm' % threading.current_thread().getName())
    8 time.sleep(3)
    9 sm.release()
    10 if __name__ == "__main__":
    11 sm = Semaphore(5)
    12 for i in range(23):
    13 t = Thread(target=func)
    14 t.start()
    信号量
  • 1 Semaphore管理一个内置的计数器,
    2 每当调用acquire()时内置计数器-1
    3 调用release() 时内置计数器+1
    4 计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

9.event

  • 线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行。
  • 1 from threading import Event
    2
    3 event.isSet():返回event的状态值;
    4
    5 event.wait():如果 event.isSet()==False将阻塞线程;
    6
    7 event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    8
    9 event.clear():恢复event的状态值为False。
    并发编程——多线程(4)并发编程——多线程(4)
     1 def conn_mysql():
    2 count = 1
    3 while not event.is_set():
    4 if count > 3:
    5 print("%s try too many times " % threading.currentThread().getName())
    6 return
    7 print('<%s>第%s次尝试链接'%(threading.current_thread().getName(),count))
    8 event.wait(0.5)
    9 count += 1
    10 print('<%s>链接成功' % threading.current_thread().getName())
    11
    12 def check_mysql():
    13 print('\033[45m[%s]正在检查mysql\033[0m' % threading.current_thread().getName())
    14 time.sleep(random.randint(2,4))
    15 event.set()
    16
    17 if __name__ == "__main__":
    18 event = Event()
    19 conn1 = Thread(target=conn_mysql)
    20 conn2 = Thread(target=conn_mysql)
    21 conn3 = Thread(target=conn_mysql)
    22 check = Thread(target=check_mysql)
    23
    24 conn1.start()
    25 conn2.start()
    26 conn3.start()
    27 check.start()
    event

10.定时器

  • 指定n秒后执行某操作
  • def hello():
    print("hello,world!")

    t
    = Timer(2,hello)
    t.start()
    # after 2 seconds, "hello, world" will be printed
  • 并发编程——多线程(4)并发编程——多线程(4)
     1 # -*- coding:utf-8 -*-
    2 from threading import Timer
    3 import random
    4
    5
    6 class Code(object):
    7 def __init__(self):
    8 self.make_cache()
    9
    10 def make_cache(self, interval=5):
    11 self.cache = self.make_code()
    12 print(self.cache)
    13 self.t = Timer(interval, self.make_cache)
    14 self.t.start()
    15
    16 def make_code(self, n=4):
    17 res = ""
    18 for i in range(n):
    19 s1 = str(random.randint(0, 9))
    20 s2 = chr(random.randint(65, 90))
    21 res += random.choice([s1, s2])
    22 return res
    23
    24 def check(self):
    25 while True:
    26 code = input("请输入验证码>>:").strip()
    27 if code.upper() == self.cache:
    28 print("验证码输入正确")
    29 self.t.cancel()
    30 break
    31
    32 if __name__ == "__main__":
    33 obj = Code()
    34 obj.check()
    定时器

.11.线程queue

  • 并发编程——多线程(4)并发编程——多线程(4)
     1 #-*- coding:utf-8 -*-
    2 import queue
    3 q = queue.Queue(3)
    4 q.put(1)
    5 q.put(2)
    6 q.put(3)
    7 # q.put(4)#阻塞
    8 # q.put(4,block=False)#抛出异常
    9 # q.put(4,block=True,timeout=3) #3s阻塞后,抛出异常
    10
    11 q.get()
    12 q.get()
    13 q.get()
    14 # q.get()#阻塞
    15 # q.get(block=False)#抛出异常
    16 # q.get_nowait()#抛出异常,和上一条等价
    17 q.get(block=True,timeout=3)#3s阻塞后,抛出异常
    先进先出
  • 并发编程——多线程(4)并发编程——多线程(4)
    1 #堆栈,先进后出
    2 import queue
    3 q = queue.LifoQueue()
    4 q.put('first')
    5 q.put('second')
    6 q.put('third')
    7 print(q.get())#third
    8 print(q.get())#second
    9 print(q.get())#first
    堆栈,先进后出
  • 并发编程——多线程(4)并发编程——多线程(4)
     1 #优先级队列
    2 import queue
    3 q = queue.PriorityQueue()
    4 #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
    5 q.put((20,'a'))
    6 q.put((10,'b'))
    7 q.put((30,'c'))
    8
    9 print(q.get())
    10 print(q.get())
    11 print(q.get())
    12
    13 # 结果(数字越小优先级越高,优先级高的优先出队):
    14 # (10, 'b')
    15 # (20, 'a')
    16 # (30, 'c')
    优先级队列
  • 多线程套接字通信
  • 并发编程——多线程(4)并发编程——多线程(4)
     1 #-*- coding:utf-8 -*-
    2 import socket
    3 from threading import Thread
    4 from concurrent.futures import ThreadPoolExecutor
    5
    6 def communicate(conn):
    7 while True:
    8 try:
    9 data = conn.recv(1024)
    10 if not data:break
    11 conn.send(data.upper())
    12 except ConnectionResetError:
    13 break
    14 conn.close()
    15
    16
    17 def server(server_ip,port):
    18 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    19 server.bind((server_ip, port))
    20 server.listen(5)
    21 while True:
    22 conn, client_addr = server.accept()
    23 # t = Thread(target=communicate,args=(conn,))
    24 # t.start()
    25 pool.submit(communicate,conn)
    26 server.close()
    27
    28 if __name__ == "__main__":
    29 pool = ThreadPoolExecutor(2)
    30 server_ip = socket.gethostbyname(socket.gethostname())
    31 port = 8080
    32 server(server_ip, port)
    服务端
    并发编程——多线程(4)并发编程——多线程(4)
     1 #-*- coding:utf-8 -*-
    2 import socket
    3 server_ip = socket.gethostbyname(socket.gethostname())
    4 client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    5 client.connect((server_ip,8080))
    6 while True:
    7 msg = input(">>:").strip()
    8 if not msg:continue
    9 client.send(msg.encode("utf-8"))
    10 data = client.recv(1024)
    11 print(data.decode("utf-8"))
    12 client.close()
    客户端

12.线程池与进程池

  • concurrent.futures模块提供了高度封装的异步调用接口
    ThreadPoolExecutor:线程池,提供异步调用
    ProcessPoolExecutor: 进程池,提供异步调用
    Both implement the same interface, which
    is defined by the abstract Executor class.
    1、submit(fn, *args, **kwargs)
    异步提交任务

    2、map(func, *iterables, timeout=None, chunksize=1)
    取代for循环submit的操作

    3、shutdown(wait=True)
    相当于进程池的pool.close()
    +pool.join()操作
    wait
    =True,等待池内所有任务执行完毕回收完资源后才继续
    wait
    =False,立即返回,并不会等待池内的任务执行完毕
    但不管wait参数为何值,整个程序都会等到所有任务执行完毕
    submit和map必须在shutdown之前

    4、result(timeout=None)
    取得结果

    5、add_done_callback(fn)
    回调函数
  • 并发编程——多线程(4)并发编程——多线程(4)
     1 #-*- coding:utf-8 -*-
    2 from threading import Thread,currentThread
    3 from multiprocessing import Process
    4 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    5 import os
    6 import time
    7 import random
    8 def task(name):
    9 print("name:%s pid:%s"%(name,os.getpid()))
    10 # print("name:%s pid:%s"%(name,currentThread().getName()))
    11 time.sleep(random.randint(1,3))
    12 if __name__ == "__main__":
    13 pool = ProcessPoolExecutor(5)
    14 # pool = ThreadPoolExecutor(5)
    15 for i in range(10):
    16 pool.submit(task,i)
    17 pool.shutdown(wait=True)
    18 print("")
    View Code
  • map函数
  • 并发编程——多线程(4)并发编程——多线程(4)
     1 # -*- coding:utf-8 -*-
    2 import os
    3 import random
    4 import time
    5 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    6
    7
    8 def task(n):
    9 print('%s is running' % os.getpid())
    10 time.sleep(random.randint(1, 3))
    11 return n ** 2
    12
    13
    14 if __name__ == "__main__":
    15 executor = ProcessPoolExecutor(max_workers=3)
    16 # for i in range(11):
    17 # executor.submit(task,i)
    18 executor.map(task,range(1,12))#map取代了for+submit
    map函数
  • 回调函数
    • 可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接收任务的返回值当作参数,该函数称为回调函数
  • 并发编程——多线程(4)并发编程——多线程(4)
     1 #-*- coding:utf-8 -*-
    2 import requests
    3 import time
    4 from concurrent.futures import ThreadPoolExecutor
    5 def get(url):
    6 response = requests.get(url)
    7 time.sleep(3)
    8 return {"url":url,"content":response.text}
    9 def parse(res):
    10 res = res.result()
    11 print("%s parse res is %s"%(res["url"],len(res["content"])))
    12 if __name__=="__main__":
    13 urls = [
    14 "http://www.woshipm.com/rp/415309.html",
    15 "https://www.python.org",
    16 "http://blog.csdn.net/shanzhizi/article/details/50903748",
    17 ]
    18 pool = ThreadPoolExecutor(2)
    19 for url in urls:
    20 pool.submit(get,url).add_done_callback(parse)
    21 pool.shutdown()
    22 print("")
    回调函数