Python Thread related

时间:2023-02-05 03:12:54
Python Thread related

1.Thread.join([timeout])

Wait until the thread terminates. This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception – or until the optional timeout occurs.

等待进程结束。也就是说,其屏蔽调用线程,直到此线程方法终止(要么正常执行完毕,或者未处理的异常,或者时间超时)

下面通过例子来说明:

没有设定timeout情况,Main 线程启动,work1 和work2 线程执行,完毕退出,Main线程执行终止

 import os
import threading
import time
import logging
import random def work1():
count=0
while count<=5:
threadname= threading.currentThread()
wait_time=random.randrange(1,4)
print("%s,count =%s wait for =%s s,time %s "%(threadname,count,wait_time,time.ctime()[-13:]))
time.sleep(wait_time)
count +=1 def work2():
i=0
while i<=5:
threadname= threading.currentThread()
wait_time=random.randrange(1,4)
print("%s,i =%s wait for =%s s,time %s "%(threadname,i,wait_time,time.ctime()[-13:]))
time.sleep(wait_time)
i +=1 if __name__ =="__main__":
mainthread= threading.currentThread()
print '%s main thread is waiting for exit'% mainthread
test1=threading.Thread(name='work1',target=work1)
test2=threading.Thread(name='work2',target=work2)
test1.start()
test2.start()
test1.join()
test2.join()
print 'main thread finish'
Python Thread related

2个线程设定超时时间work1 5s,work2 4s,9s之后调用线程结束而不等待超时的线程:

 import os
import threading
import time
import logging
import random def work1():
count=0
while count<=5:
threadname= threading.currentThread()
wait_time=random.randrange(1,4)
print("%s,count =%s wait for =%s s,time %s "%(threadname,count,wait_time,time.ctime()[-13:]))
time.sleep(wait_time)
count +=1 def work2():
i=0
while i<=5:
threadname= threading.currentThread()
wait_time=random.randrange(1,4)
print("%s,i =%s wait for =%s s,time %s "%(threadname,i,wait_time,time.ctime()[-13:]))
time.sleep(wait_time)
i +=1 if __name__ =="__main__":
mainthread= threading.currentThread()
print '%s main thread is waiting for exit'% mainthread
test1=threading.Thread(name='work1',target=work1)
test2=threading.Thread(name='work2',target=work2)
test1.start()
test2.start()
test1.join(4)
test2.join(5)
print 'main thread finish

Python Thread related

 2.Producer and comsumer

 import Queue
import threading
import random #writelock =threading.Lock()
class Producer(threading.Thread):
def __init__(self,q,con,name):
super(Producer,self).__init__()
self.q = q
self.con = con
self.name = name
print "produce" +self.name+"started"
def run(self):
while 1:
#global writelock
self.con.acquire()#acquire the lock
if self.q.full():#if queue is full
#with writelock:#output info
print "queue is full, producer wait"
self.con.wait()#wait for resource
else:
value = random.ranint(0,10)
#with writelock:
print self.name+"put value"+self.name+":"+str(value)+"into queue"
self.q.put((self.name+":"+str(value)))#put to queue
self.con.notify()#inform consumer
self.con.release()#release the lock class Consumer(threading.Thread):
def __init__(self,q,con,name):
super(Consumer,self).__init__()
self.q = q
self.con = con
self.name = name
print "consume" +self.name+"started\n"
def run(self):
while 1:
#global writelock
self.con.acquire()
if self.q.empty():#if empty
#with writelock:
print "queue is empty,consumer wait"
self.con.wait()#wait the resource ready
else:
value = self.q.get()#get one element from queue
#with writelock:
print self.name +"get value"+ value+"from queue"
self.q.notify()#inform producer
self.con.release()#release the lock if __name__ == "__main__":
print "start to run\n"
q = Queue.Queue(10)
con = threading.Condition()
p = Producer(q,con,"p1")
p.start()
p1 = Producer(q,con,"p2")
p1.start()
c1 = Consumer(q,con,"c1")
c1.start()

3.Queue

programming python 4th 205页

Queue 提供标准的队列数据结构,实现python对象的先进先出,其可包含基本类型(string,list,dictionary......),类实例,任何可调用函数或绑定的方法等。但是Queue不像正常的list,因为其自动被线程获取和释放锁操作。

#coding:utf-8"
import logging,threading,time
import Queue def fibo_task(cond):
with cond:
while shared_queue.empty():
logger.info("[%s]- waiting for element in queue......" % threading.current_thread().name)
cond.wait()
else:
value =shared_queue.get()
a,b=0,1
for item in range(value):
a,b=b,a+b
fibo_dict[value] = a
shared_queue.task_done()
time.sleep(2)
logger.debug("[%s] fibo of key[%d] with result[%d]" % (threading.current_thread().name,value,fibo_dict[value])) def queue_task(cond):
logging.debug('starting list data to queue......')
with cond:
for data in impit_list:
shared_queue.put(data)
#[shared_queue.put(data) for data in impit_list]
cond.notifyAll() if __name__ == "__main__":
logger =logging.getLogger()
logger.setLevel(logging.DEBUG)
formatter =logging.Formatter('%(asctime)s =%(message)s') ch=logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
logger.addHandler(ch)
fibo_dict={}
shared_queue =Queue.Queue()
impit_list =[3,10,5,7]
queue_cond=threading.Condition()
print "main thread starting......"
threads =[threading.Thread(target=fibo_task,args=(queue_cond,)) for i in range(4)]
#for thread in threads:
#thread.setDaemon(True)
#print 'daemon is %d' % thread.isDaemon() [thread.start() for thread in threads] prod = threading.Thread(name='queue_task_thread',target=queue_task,args=(queue_cond,))
prod.setDaemon(True)
prod.start() [thread.join() for thread in threads]
print "main thread done"

Python Thread related