python线程池控制

541 查看

最近一个项目上使用线程池,设定处理项1W,10线程,但是需要检测线程进行状态。出现错误N次,就自动终止线程。实现如下:

线程池代码

<!-- more -->

ERRORCOUNT=0
IS_EXIT=True
class Worker(Thread):
    worker_count=0
    timeout=1
    def __init__(self, workQueue,resultQueue,**kwds):
        Thread.__init__(self,**kwds)
        self.id=Worker.worker_count
        Worker.worker_count+=1
        self.setDaemon(True)
        self.workQueue=workQueue
        self.resultQueue=resultQueue
        self.start()

    def run(self):
        #the get-some-work,do-some-work main loop of worker threads
        while IS_EXIT:
            try:
                callable,args,kwds=self.workQueue.get(timeout=Worker.timeout)
                res=callable(*args,**kwds)
                self.resultQueue.put(res)
            except Queue.Empty:
                break
            except:
                pass

class WorkerManager:
    def __init__(self, num_of_workers=10,timeout=2):
        self.workQueue=Queue.Queue()
        self.resultQueue=Queue.Queue()
        self.workers=[]
        self.timeout=timeout
        self._recruitThreads(num_of_workers)

    def _recruitThreads(self,num_of_workers):
        for i in range(num_of_workers):
            worker=Worker(self.workQueue,self.resultQueue)
            self.workers.append(worker)

    def wait_for_complete(self):
        #then,wait for each of them to terminate
        while len(self.workers):
            worker=self.workers.pop()
            worker.join(10)
            if worker.isAlive() and not self.workQueue.empty():
                self.workers.append(worker)

    def add_job(self,callable,*args,**kwds):
        self.workQueue.put((callable,args,kwds))

    def get_result(self,*args,**kwds):
        return self.resultQueue.get(*args,**kwds)

在WorkerManager中使用到一个全局变量IS_EXIT用来判断是否需要退出线程

调用线程代码

wm=WorkerManager(10)#10线程
for i in range(10000):
    wm.add_job(do_job)
wm.wait_for_complete()

具体工作代码

def do_job():
    global ERRORCOUNT
    global IS_EXIT
    try:
        do anything
    except:
        ERRORCOUNT+=1
            if ERRORCOUNT>5:
                IS_EXIT=False

此处使用了全局变量ERRORCOUNT统计错误数量,超过指定次数,则设置IS_EXIT=False通知线程停止执行。
至此基本上满足项目所需,但并不友好,应有更好的方式。