多线程的学习与python实现

时间:2023-03-08 22:15:45
多线程的学习与python实现

学习了进程与线程,现对自己的学习进行记录。

目录:

一.进程与线程的概念,以及联系与区别

二.多线程

三.python中多线程的应用

四.python实例

五.参考文献

一.进程与线程的概念。以及联系与区别

进程可以被称为执行的程序,一个进程拥有完整的数据空间和代码空间,每一个进程的地址空间都是独立的,进程之间不能共享数据。

线程是进程的一部分,也可以称为mini 进程。在同一个进程中的线程共用同一个地址空间,单有自己独立的堆栈和局部变量。所以除了堆栈中的数据,其余所有数据都可以共享。

如果再形象点就引用一哥们的比喻,很形象:

比如一个公司,有很多不同的部门,每个部门不在 同一个城市,而每个部门都有许多的员工。

公司就好比是一个CPU,不同的部门就相当于不同的进程,他们是你干你的,我干我的,所利用的空间不同。想共享些什么数据,需要email与传真;而一个部门的每个员工,相当于不同的线程,共在同一个部门,所有东西都可以共享。假如说一个人在用打印机,也就     是数据进入堆栈了,我再用打印机,你就用不了了,得等。

联系:

进程包括线程,可以有一个或者多个

区别:

1.进程有独立的地址空间,多进程较稳定,因为其中一个出现状况不影响另外一个;同一个进程的多个线程,共用地址空间,多线程相比于多进程,稳定性要差,因为一个线程出现问题会严重影响其他线程。

2.进程之间需要共享数据,要利用进程间通讯;同一个进程中的线程不需要。

3.进程只是资源分配的最小单位;线程是执行的最小单位,也就是说实际执行的是线程。

二.多线程

我主要是针对多线程的学习,记录如下。

多个线程运行在同一个进程中,线程之间可以共享数据。每个线程都有开始,顺序执行和结束3部分,也就是在一个线程内部,代码会按照顺序依次执行的。它有一个自己的指令指针,记录自己运行到什么位置,线程在运行时可能被强占或暂时的挂起。

线程中也要有一个主线程,该线程需要每个线程要做什么,线程需要什么数据和参数,以及线程结束的时候,它们都提供了什么结果,主线程可以把各个线程的结果组成有意义的结果。

同一个进程中的线程之间可以共享数据以及相互通讯,但这种共享,也会带来危险。如果多个线程共同访问同一片数据,则很有导致数据结果不一致的问题,这叫竞态条件。由此大多数线程都带有一系列的同步源语,来控制线程的执行和数据的访问。

如果操作不当,就会产生死锁,死锁比如说有两个线程x,y,都要利用资源A和B,线程x先获得到A,要获取B,y先获得到B,要获得A,他俩都在等在资源,但谁都没有丢掉自己已经获得的资源,这样就陷入了互相无线等待的局面。

产生死锁的条件:

1.互斥条件:多个线程不能同时使用统一资源。
       2.请求与保持条件:一个线程必须拥有N个资源才能完成任务,它会一直占用已经获得的资源部防守
       3.不剥夺条件:对于某个线程已经获得的资源,其他线程不能强行剥夺。
      4.循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系。

    死锁一般是很难发现的。
   以下对同步的原因讲解,摘自http://buaawhl.iteye.com/blog/164905
   同步这个词是从英文synchronize(使同时发生)翻译过来的。我也不明白为什么要用这个很容易引起误解的词。既然大家都这么用,咱们也就只好这么将就。 
   线程同步的真实意思和字面意思恰好相反。线程同步的真实意思,其实是“排队”:几个线程之间要排队,一个一个对共享资源进行操作,而不是同时进行操作。

因此,关于线程同步,需要牢牢记住的第一点是:线程同步就是线程排队。同步就是排队。线程同步的目的就是避免线程“同步”执行。这可真是个无聊的绕口令。 
     关于线程同步,需要牢牢记住的第二点是 “共享”这两个字。只有共享资源的读写访问才需要同步。如果不是共享资源,那么就根本没有同步的必要。 
     关于线程同步,需要牢牢记住的第三点是,只有“变量”才需要同步访问。如果共享的资源是固定不变的,那么就相当于“常量”,线程同时读取常量也不需要同步。至少一个线程修改共享资源,这样的情况下,线程之间就需要同步。 
    关于线程同步,需要牢牢记住的第四点是:多个线程访问共享资源的代码有可能是同一份代码,也有可能是不同的代码;无论是否执行同一份代码,只要这些线程的代码访问同一份可变的共享资源,这些线程之间就需要同步。

为了加深理解,下面举几个例子。 
    有两个采购员,他们的工作内容是相同的,都是遵循如下的步骤: 
(1)到市场上去,寻找并购买有潜力的样品。 
(2)回到公司,写报告。 
   这两个人的工作内容虽然一样,他们都需要购买样品,他们可能买到同样种类的样品,但是他们绝对不会购买到同一件样品,他们之间没有任何共享资源。所以,他们可以各自进行自己的工作,互不干扰。 
   这两个采购员就相当于两个线程;两个采购员遵循相同的工作步骤,相当于这两个线程执行同一段代码。

下面给这两个采购员增加一个工作步骤。采购员需要根据公司的“布告栏”上面公布的信息,安排自己的工作计划。 
   这两个采购员有可能同时走到布告栏的前面,同时观看布告栏上的信息。这一点问题都没有。因为布告栏是只读的,这两个采购员谁都不会去修改布告栏上写的信息。

下面增加一个角色。一个办公室行政人员这个时候,也走到了布告栏前面,准备修改布告栏上的信息。 
    如果行政人员先到达布告栏,并且正在修改布告栏的内容。两个采购员这个时候,恰好也到了。这两个采购员就必须等待行政人员完成修改之后,才能观看修改后的信息。 
    如果行政人员到达的时候,两个采购员已经在观看布告栏了。那么行政人员需要等待两个采购员把当前信息记录下来之后,才能够写上新的信息。 
   上述这两种情况,行政人员和采购员对布告栏的访问就需要进行同步。因为其中一个线程(行政人员)修改了共享资源(布告栏)。而且我们可以看到,行政人员的工作流程和采购员的工作流程(执行代码)完全不同,但是由于他们访问了同一份可变共享资源(布告       栏),所以他们之间需要同步。 

 三、python对多线程的应用
1. 
执行 Python 程序的时候, 是按照从主模块顶端向下执行的. 循环用于重复执行部分代码, 函数和方法会将控制临时移交到程序的另一部分.通过线程, 你的程序可以在同时处理多个任务. 每个线程都有它自己的控制流.所以你可以在一个线程里从文件读取数据, 另个向屏幕输出内容.为了保证两个线程可以同时访问相同的内部数据, Python 使用了 global interpreter lock (全局解释器锁) . 在同一时间只可能有一个线程执行Python 代码; Python 实际上是自动地在一段很短的时间后切换到下个线程执行, 或者等待 一个线程执行一项需要时间的操作(例如等待通过 socket 传输的数据, 或是从文件中读取数据).
python虚拟机执行过程:
1、设置GIL
2、切换到一个线程执行
3、运行:指定数量的字节码指令,线程主动让出控制(可以调用time.sleep(),也就是如果利用sleep,该线程就会进入休眠状态,然后切换到其他的线程上,如果利用了Lock,那么就不管是否调用该函数,都不好使,都要等到release后,才能切换到其他线程去)
4、把线程设置为睡眠状态
5、解锁GIL
在调用外部代码时,GIL会被锁定,因为没有python代码执行,可以主动解锁。
2.python的多线程模块,Threading模块,Queue,Multiprocess(多进程模块)
thread和threading模块允许程序员创建和管理线程,thread模块提供基本的线程和锁的支持,而threading提供了更高级别的线程管理的功能。
thread模块,当主线程退出时,其他线程被强制退出,可能还没有清楚,而threading模块能够确保所有线程退出后,进程才退出。thread模块不支持守护线程,只要主线程运行完,就直接退出,而不管是否有其他线程在运行。
thread模块利用start_new_thread创建线程后,是立即执行的,这样就很不好控制同步;threading模块创建线程对象后,如果不启动start是不执行的。
queque模块允许用户创建一个可以应用于多个线程之间共享数据的队列数据结构。
在threading模块中:
['activeCount', 'active_count', 'Condition', 'currentThread',
 'current_thread', 'enumerate', 'Event','Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',  'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
守护线程:一般是等待客户请求的服务器,如果没有客户请求,就一直等着,就如同socket编程中的server一样。
如果你的主线程退出时,不用等待那些子线程,你可以设定线程的daemon属性。即在线程开始前,调用setDaemon函数表示不重要。
class threading.Thread(group=Nonetarget=Nonename=Noneargs=()kwargs={}*daemon=None)

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If not Nonedaemon explicitly sets whether the thread is daemonic. If None (the default), the daemonic property is inherited from the current thread.

If the subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

Changed in version 3.3: Added the daemon argument.

start()

Start the thread’s activity.

It must be called at most once per thread object. It arranges for the object’s run() method to be invoked in a separate thread of control.

This method will raise a RuntimeError if called more than once on the same thread object.

run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as thetarget argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

join(timeout=None)

Wait until the thread terminates. This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception –, or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raise the same exception.

name

A string used for identification purposes only. It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.

getName()
setName()

Old getter/setter API for name; use it directly as a property instead.

ident

The ‘thread identifier’ of this thread or None if the thread has not been started. This is a nonzero integer. See the _thread.get_ident() function. Thread identifiers may be recycled when a thread exits and another thread is created. The identifier is available even after the thread has exited.

is_alive()

Return whether the thread is alive.

This method returns True just before the run() method starts until just after the run() method terminates. The module function enumerate() returns a list of all alive threads.

daemon

A boolean value indicating whether this thread is a daemon thread (True) or not (False). This must be set before start() is called, otherwiseRuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.

The entire Python program exits when no alive non-daemon threads are left.

isDaemon()  ,该函数可以用来判断是否是守护线程
setDaemon()

Old getter/setter API for daemon; use it directly as a property instead.

setDaemon函数可以用来设定某个线程是否是守护线程。True即为守护线程,表示该线程不重要,在主线程结束后,可以直接退出;如果是False就不是守护线程,主线程要等待子线程结束后再退出。
 Note:
1.关于共享变量
  假如要共享全局变量(global),如果不分配好,就会出现错误,甚至意想不到的后果。
 比如:
 在线程的run()方法内,有如下语句:a=3 global a a=a+10,
  有两个线程,A 线程先读a,读取的值为3,在未执行下一个语句时,B线程也读取了a,也为3,结果最后结果我们希望得到23,输出值却为13.
  不过如果是如下形式,a+=10,不用担心出现紊乱,因为它是原子的,系统会保护该操作在其他线程开始前结束。
 假如要共享局部变量,那么局部变量是每个线程私有的
对出共享全局变量出现的问题,要利用锁。
类是Lock()。对Thread类进行重构。
import time
import threading
array=[4,2,1]
def Func(secs,k):
time.sleep(secs)
print 'No %d starts at'%k,time.ctime()
lock=threading.Lock()
class MyThread(threading.Thread):
def __init__(self,secs):
self.secs=secs
super(MyThread,self).__init__()
def run(self):
lock.acquire()
time.sleep(self.secs)
print 'Done'
lock.release() def main(): ths=[]
Len=range(len(array))
for i in [2,3]:
m=MyThread(i)
ths.append(m) for i in range(2):
print ths[i].getName()
ths[i].start()
ths[0].join()
ths[1].join()
print 'all done,ok! it costs:',time.clock() main()

  

3.3. RLock()

RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。

可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。

构造方法: 
RLock()

实例方法: 
acquire([timeout])/release(): 跟Lock差不多。

3.4. Condition

Condition(条件变量)通常与一个锁关联。需要在多个Contidion*享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。

可以认为,除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于状态图中的等待阻塞状态,直到另一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定。

构造方法: 
Condition([lock/rlock])

实例方法: 
acquire([timeout])/release(): 调用关联的锁的相应方法。 
wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。 
notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。 
notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。

例子是很常见的生产者/消费者模式:

3.5. Semaphore/BoundedSemaphore

Semaphore(信号量)是计算机科学史上最古老的同步指令之一。Semaphore管理一个内置的计数器,每当调用acquire()时-1,调用release() 时+1。计数器不能小于0;当计数器为0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。

基于这个特点,Semaphore经常用来同步一些有“访客上限”的对象,比如连接池。

BoundedSemaphore 与Semaphore的唯一区别在于前者将在调用release()时检查计数器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。

构造方法: 
Semaphore(value=1): value是计数器的初始值。

实例方法: 
acquire([timeout]): 请求Semaphore。如果计数器为0,将阻塞线程至同步阻塞状态;否则将计数器-1并立即返回。 
release(): 释放Semaphore,将计数器+1,如果使用BoundedSemaphore,还将进行释放次数检查。release()方法不检查线程是否已获得 Semaphore。

3.6. Event

Event(事件)是最简单的线程通信机制之一:一个线程通知事件,其他线程等待事件。Event内置了一个初始为False的标志,当调用set()时设为True,调用clear()时重置为 False。wait()将阻塞线程至等待阻塞状态。

Event其实就是一个简化版的 Condition。Event没有锁,无法使线程进入同步阻塞状态。

构造方法: 
Event()

实例方法: 
isSet(): 当内置标志为True时返回True。 
set(): 将标志设为True,并通知所有处于等待阻塞状态的线程恢复运行状态。 
clear(): 将标志设为False。 
wait([timeout]): 如果标志为True将立即返回,否则阻塞线程至等待阻塞状态,等待其他线程调用set()。

3.7. Timer

Timer(定时器)是Thread的派生类,用于在指定时间后调用一个方法。

构造方法: 
Timer(interval, function, args=[], kwargs={}) 
interval: 指定的时间 
function: 要执行的方法 
args/kwargs: 方法的参数

实例方法: 
Timer从Thread派生,没有增加实例方法。

3.8. local

local是一个小写字母开头的类,用于管理 thread-local(线程局部的)数据。对于同一个local,线程无法访问其他线程设置的属性;线程设置的属性不会被其他线程设置的同名属性替换。

可以把local看成是一个“线程-属性字典”的字典,local封装了从自身使用线程作为 key检索对应的属性字典、再使用属性名作为key检索属性值的细节。

熟练掌握Thread、Lock、Condition就可以应对绝大多数需要使用线程的场合,某些情况下local也是非常有用的东西。本文的最后使用这几个类展示线程基础中提到的场景:

十三、Queque模块
允许创建一个可以用于多个线程之间共享数据的队列数据结构。

Queue Objects

Queue objects (QueueLifoQueue, or PriorityQueue) provide the public methods described below.

Queue.qsize()

Return the approximate size of the queue. Note, qsize() > 0 doesn’t guarantee that a subsequent get() will not block, nor will qsize() < maxsize guarantee that put() will not block.

Queue.empty()

Return True if the queue is empty, False otherwise. If empty() returns True it doesn’t guarantee that a subsequent call to put() will not block. Similarly, if empty() returns False it doesn’t guarantee that a subsequent call to get() will not block.

Queue.full()

Return True if the queue is full, False otherwise. If full() returns True it doesn’t guarantee that a subsequent call to get() will not block. Similarly, if full() returns False it doesn’t guarantee that a subsequent call to put() will not block.

Queue.put(itemblock=Truetimeout=None)

Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

Queue.put_nowait(item)

Equivalent to put(item, False).

Queue.get(block=Truetimeout=None)

Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. Iftimeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

Queue.get_nowait()

Equivalent to get(False).

Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.

Queue.task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call totask_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

Queue.join()

Blocks until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done()to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

semphore与Queque差不多,QueQue更加的方便。
queque.join() 就是直到队列中所有的任务都完成了才可以开始下一个线程,就是一直阻塞到所有任务都完成。
四、python实例
   功能是利用多线程,一起搜索一个文件,得到给定的某一个字节出现次数。
 环境:Cygwin
解释器:Python2.7.6
import timeit
2 import threading
3 import sys
4 import os
5
6 def GetFileSize(filename):
7
8 FileSize=os.path.getsize(filename)
9 return FileSize
10
11
12
13 array=[]
14 lock=threading.Lock()
15 cur=0
16 sum=0
17
18
19 def Print_InColor(color,msg):
20
21 print '\033[0;%dm%s\033[0m' %(color,msg) 22
23 class MyThread(threading.Thread):
24
25 def __init__(self,filename):
26
27 self.filename=filename
28 super(MyThread,self).__init__()
29
30 def run(self):
31
32 global cur
33 global sum
34 Done=True
35 size=GetFileSize(self.filename)
36 f=file(self.filename,'r')
37
38 lock.acquire()
39 start=cur
40 Print_InColor(31,threading.currentThread().getName())
41 Print_InColor(33,start)
42 length=start+int(size/3)
43 cur=end=length if length<size else size
44 Print_InColor(33,cur)
45 lock.release()
46
47 if start==size:
48 f.close()
49
50 f.seek(start,0)
51 print 'the new start position',f.tell(),self.getName()
52
53 while Done:
54 p=f.read(1)
55 c=f.tell()
56 if c>end:
57 print 'has ended,the posoition is ',c,self.getName()
58 Done=False
59 elif p=='i':
60 array.append(p)
61 sum+=1
62 #print sum
63 else:
64 continue
65
66
67 f.close()
68
69
70
71
def main():
73
74 thds=[]
75 filename=sys.argv[1]
76
77
78 for i in range(3):
79 t=MyThread(filename)
80 thds.append(t)
81
82 for t in thds:
83 t.start()
84
85 for t in thds:
86 t.join()
87
88
89
90
91
92 if __name__=="__main__":
93
94 t=timeit.Timer("main()","from __main__ import main")
95 print t.timeit(1)

下一步计划是实现多线程的socket服务器,Come on!

五、参考文献

1.python核心编程

2.www.docs.python.org

3.python网络编程基础