Package rekall :: Module threadpool
[frames] | no frames]

Source Code for Module rekall.threadpool

 1  # Rekall Memory Forensics 
 2  # 
 3  # Copyright 2013 Google Inc. All Rights Reserved. 
 4  # 
 5  # Authors: 
 6  # Michael Cohen <scudette@gmail.com> 
 7  # Copyright (c) 2012 
 8  # 
 9  # This program is free software; you can redistribute it and/or modify 
10  # it under the terms of the GNU General Public License as published by 
11  # the Free Software Foundation; either version 2 of the License, or (at 
12  # your option) any later version. 
13  # 
14  # This program is distributed in the hope that it will be useful, but 
15  # WITHOUT ANY WARRANTY; without even the implied warranty of 
16  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 
17  # General Public License for more details. 
18  # 
19  # You should have received a copy of the GNU General Public License 
20  # along with this program; if not, write to the Free Software 
21  # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 
22  # 
23  import logging 
24  import threading 
25  import traceback 
26  import Queue 
27   
28   
29  # Simple threadpool implementation - we just run all tests in the pool for 
30  # maximum concurrency. 
31 -class Worker(threading.Thread):
32 """A Threadpool worker. 33 34 Reads jobs from the queue and runs them. Quits when a None job is received 35 on the queue. 36 """
37 - def __init__(self, queue):
38 super(Worker, self).__init__() 39 self.queue = queue 40 self.daemon = True 41 42 # Start the thread immediately. 43 self.start()
44
45 - def run(self):
46 while True: 47 # Get a callable from the queue. 48 task, args, kwargs = self.queue.get() 49 50 try: 51 # Stop the worker by sending it a task of None. 52 if task is None: 53 break 54 55 on_error = kwargs.pop("on_error") 56 if on_error is None: 57 on_error = lambda x: None 58 59 task(*args, **kwargs) 60 except Exception as e: 61 print e 62 logging.error("Worker raised %s", e) 63 traceback.print_exc() 64 on_error(e) 65 66 finally: 67 self.queue.task_done()
68 69
70 -class ThreadPool(object):
71 lock = threading.Lock() 72
73 - def __init__(self, number_of_threads):
74 self.number_of_threads = number_of_threads 75 self.queue = Queue.Queue(2 * number_of_threads) 76 self.workers = [Worker(self.queue) for _ in range(number_of_threads)]
77
78 - def Stop(self):
79 """Stop all the threads when they are ready.""" 80 self.queue.join() 81 82 # Send all workers the stop message. 83 for worker in self.workers: 84 self.AddTask(None) 85 86 for worker in self.workers: 87 worker.join()
88
89 - def AddTask(self, task, args=None, kwargs=None, on_error=None):
90 if kwargs is None: 91 kwargs = {} 92 kwargs["on_error"] = on_error 93 self.queue.put((task, args or [], kwargs))
94