1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 import logging
24 import threading
25 import traceback
26 import Queue
27
28
29
30
31 -class Worker(threading.Thread):
32 """A Threadpool worker.
33
34 Reads jobs from the queue and runs them. Quits when a None job is received
35 on the queue.
36 """
38 super(Worker, self).__init__()
39 self.queue = queue
40 self.daemon = True
41
42
43 self.start()
44
46 while True:
47
48 task, args, kwargs = self.queue.get()
49
50 try:
51
52 if task is None:
53 break
54
55 on_error = kwargs.pop("on_error")
56 if on_error is None:
57 on_error = lambda x: None
58
59 task(*args, **kwargs)
60 except Exception as e:
61 print e
62 logging.error("Worker raised %s", e)
63 traceback.print_exc()
64 on_error(e)
65
66 finally:
67 self.queue.task_done()
68
69
71 lock = threading.Lock()
72
74 self.number_of_threads = number_of_threads
75 self.queue = Queue.Queue(2 * number_of_threads)
76 self.workers = [Worker(self.queue) for _ in range(number_of_threads)]
77
79 """Stop all the threads when they are ready."""
80 self.queue.join()
81
82
83 for worker in self.workers:
84 self.AddTask(None)
85
86 for worker in self.workers:
87 worker.join()
88
89 - def AddTask(self, task, args=None, kwargs=None, on_error=None):
90 if kwargs is None:
91 kwargs = {}
92 kwargs["on_error"] = on_error
93 self.queue.put((task, args or [], kwargs))
94