python线程池实现

时间:2022-09-04 13:16:14

python 的线程池主要有threadpool,不过它并不是内置的库,每次使用都需要安装,而且使用起来也不是那么好用,所以自己写了一个线程池实现,每次需要使用直接import即可。其中还可以根据传入的特征量handlerkey来获取每个任务的结果。

#!/bin/env python
# -*- coding:utf-8 -*- """
@lx
created on 2016-04-14
""" import Queue
import sys
import threading
import time
import StringIO
import traceback reload(sys)
sys.setdefaultencoding("utf8") class MyThread(threading.Thread):
"""Background thread connected to the requests/results queues."""
def __init__(self, workQueue, resultQueue, timeout=0.1, **kwds):
threading.Thread.__init__(self, **kwds)
self.setDaemon(True)
self._workQueue = workQueue
self._resultQueue = resultQueue
self._timeout = timeout
self._dismissed = threading.Event()
self.start() def run(self):
"""Repeatedly process the job queue until told to exit."""
while True:
if self._dismissed.isSet():
break handlerKey = None # unique key
code = 0 # callback return code
handlerRet = None
errMsg = "" try:
callable, args, kwds = self._workQueue.get(True, self._timeout)
except Queue.Empty:
continue
except:
exceptMsg = StringIO.StringIO()
traceback.print_exc(file=exceptMsg)
errMsg = exceptMsg.getvalue()
code = 3301 # system error
self._resultQueue.put(
(handlerKey, code, (callable, args, kwds), errMsg))
break if self._dismissed.isSet():
self._workQueue.put((callable, args, kwds))
break try:
if "handlerKey" in kwds:
handlerKey = kwds["handlerKey"]
handlerRet = callable(*args, **kwds) # block
self._resultQueue.put((handlerKey, code, handlerRet, errMsg))
except:
exceptMsg = StringIO.StringIO()
traceback.print_exc(file=exceptMsg)
errMsg = exceptMsg.getvalue()
code = 3303
self._resultQueue.put((handlerKey, code, handlerRet, errMsg)) def dismiss(self):
"""Sets a flag to tell the thread to exit when done with current job."""
self._dismissed.set() class ThreadPool(object):
def __init__(self, workerNums=3, timeout=0.1):
self._workerNums = workerNums
self._timeout = timeout
self._workQueue = Queue.Queue() # no maximum
self._resultQueue = Queue.Queue()
self.workers = []
self.dismissedWorkers = []
self._createWorkers(self._workerNums) def _createWorkers(self, workerNums):
"""Add num_workers worker threads to the pool."""
for i in range(workerNums):
worker = MyThread(self._workQueue, self._resultQueue,
timeout=self._timeout)
self.workers.append(worker) def _dismissWorkers(self, workerNums, _join=False):
"""Tell num_workers worker threads to quit after their current task."""
dismissList = []
for i in range(min(workerNums, len(self.workers))):
worker = self.workers.pop()
worker.dismiss()
dismissList.append(worker) if _join:
for worker in dismissList:
worker.join()
else:
self.dismissedWorkers.extend(dismissList) def _joinAllDissmissedWorkers(self):
"""
Perform Thread.join() on all
worker threads that have been dismissed.
"""
for worker in self.dismissedWorkers:
worker.join()
self.dismissedWorkers = [] def addJob(self, callable, *args, **kwds):
self._workQueue.put((callable, args, kwds)) def getResult(self, block=False, timeout=0.1):
try:
item = self._resultQueue.get(block, timeout)
return item
except Queue.Empty, e:
return None
except:
raise def waitForComplete(self, timeout=0.1):
"""
Last function. To dismiss all worker threads. Delete ThreadPool.
:param timeout
"""
while True:
workerNums = self._workQueue.qsize() # 释放掉所有线程
runWorkers = len(self.workers) if 0 == workerNums:
time.sleep(timeout) # waiting for thread to do job
self._dismissWorkers(runWorkers)
break
# if workerNums < runWorkers: # 不能这样子乱取消
# self._dismissWorkers(runWorkers - workerNums)
time.sleep(timeout)
self._joinAllDissmissedWorkers() if "__main__" == __name__:
test1 = """
def doSomething(*args, **kwds):
if "sleep" in kwds:
sleep = kwds["sleep"]
msgTxt = "sleep %fs.." % sleep
time.sleep(sleep)
return msgTxt for i in range(10):
print doSomething(sleep=0.1, handlerKey="key-%d"%i) wm = ThreadPool(10)
for i in range(10):
wm.addJob(doSomething, sleep=1, handlerKey="key-%d"%i)
wm.waitForComplete()
for i in range(10):
print wm.getResult()
del wm
"""
# test2 = """ def doSomething_(*args, **kwds):
sleep = int(args[0])
msgTxt = "sleep %ds.." % sleep
time.sleep(sleep)
return msgTxt wm = ThreadPool(10)
result = []
for i in range(10):
data = 5
wm.addJob(doSomething_, data) while 1:
res = wm.getResult()
if res:
result.append(res)
if 10 == len(result):
break
print "sleep 0.1"
time.sleep(0.1)
print time.time()
wm.waitForComplete()
print time.time()
# """

原创文章,转载请备注原文地址 http://www.cnblogs.com/lxmhhy/p/6032924.html

知识交流讨论请加qq群:180214441。谢谢合作

python线程池实现的更多相关文章

  1. 自定义高级版python线程池

    基于简单版创建类对象过多,现自定义高级版python线程池,代码如下 #高级线程池 import queue import threading import time StopEvent = obje ...

  2. 对Python线程池

    本文对Python线程池进行详细说明介绍,IDE选择及编码的解决方案进行了一番详细的描述,实为Python初学者必读的Python学习经验心得. AD: 干货来了,不要等!WOT2015 北京站演讲P ...

  3. Python 线程池&lpar;小节&rpar;

    Python 线程池(小节) from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time, ...

  4. python线程池ThreadPoolExecutor(上)(38)

    在前面的文章中我们已经介绍了很多关于python线程相关的知识点,比如 线程互斥锁Lock / 线程事件Event / 线程条件变量Condition 等等,而今天给大家讲解的是 线程池ThreadP ...

  5. python线程池及其原理和使用

    python线程池及其原理和使用 2019-05-29 17:05:20 whatday 阅读数 576 系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互.在这种情形下,使用线程池可以很 ...

  6. python线程池示例

    使用with方式创建线程池,任务执行完毕之后,会自动关闭资源 , 否则就需要手动关闭线程池资源  import threading, time from concurrent.futures impo ...

  7. Python线程池与进程池

    Python线程池与进程池 前言 前面我们已经将线程并发编程与进程并行编程全部摸了个透,其实我第一次学习他们的时候感觉非常困难甚至是吃力.因为概念实在是太多了,各种锁,数据共享同步,各种方法等等让人十 ...

  8. 《转》python线程池

    线程池的概念是什么? 在IBM文档库中这样的一段描写:“在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是 如此,虚拟机将试图跟踪每一个对象 ...

  9. 一个简单的python线程池框架

    初学python,实现了一个简单的线程池框架,线程池中除Wokers(工作线程)外,还单独创建了一个日志线程,用于日志的输出.线程间采用Queue方式进行通信. 代码如下:(不足之处,还请高手指正) ...

随机推荐

  1. 介绍对称加密算法,最常用的莫过于DES数据加密算法

    DES DES-Data Encryption Standard,即数据加密算法.是IBM公司于1975年研究成功并公开发表的.DES算法的入口参数有三个:Key.Data.Mode.其中Key为8个 ...

  2. 基于hk2框架的功能测试Mock注入

    public Object getInstance(Class<?> clz){ return IocBean.get(clz.getName()); } public Object Mo ...

  3. linux-redhat6&period;4驱动无线网卡rtl8188eu

    无线网卡Realtek Semiconductor Cop. RTL8188EUS  首先下载安装包: ​其中的0BDA是Realtek的代码,8179是设备代码.从网上查到这个设备的芯片是rtl81 ...

  4. 使用高德地图SDK获取定位信息

    使用高德地图SDK获取定位信息 第一步: 在高德官网下载SDK,如我这里需要获取定位信息,故仅下载"定位功能" 第二步: 注册成为开发者,并创建应用获取相应的key.其中,在使用A ...

  5. NameValueCollection详解

    1.NameValueCollection类集合是基于 NameObjectCollectionBase 类. 但与 NameObjectCollectionBase 不同,该类在一个键下存储多个字符 ...

  6. TCP三次握手机制中的seq和ack

    TCP连接的三次握手:第一次(A--->B),SYN=1,seq=x第二次(B--->A),SYN=1,ACK=1,seq=y,ack=x+1 第三次(A--->B),ACK=1,s ...

  7. Zabbix二次开发&lowbar;02获取数据

    最近准备写一个zabbix二次页面的呈现.打算调用zabbix api接口来进行展示. 具体流程以及获取的数据. 1.  获得认证密钥    2.  获取zabbix所有的主机组    3.  获取单 ...

  8. Atom编辑器试用(Win7)

     看到Atom的官网(https://atom.io/)上说,Atom是A hackable text editor for the 21st Century,是一个基于V8引擎的editor,于是决 ...

  9. 【ichartjs】用ichartjs替代Excel做直方图

    在 http://www.cnblogs.com/xiandedanteng/p/8717506.html 一文中,最后是用Excel作图,现在用ichartjs替代之. 效果如下: 文件下载: ht ...

  10. iOS 调用第三方地图进行导航

    //支持的地图 { _model = model; //支持的地图 NSMutableArray *maps = [NSMutableArray array]; //苹果原生地图-苹果原生地图方法和其 ...