Spaces:
Runtime error
Runtime error
| ''' | |
| WorkerPool and WorkerBase for handling the common problems in managing | |
| a multiprocess pool of workers that aren't done by multiprocessing.Pool, | |
| including setup with per-process state, debugging by putting the worker | |
| on the main thread, and correct handling of unexpected errors, and ctrl-C. | |
| To use it, | |
| 1. Put the per-process setup and the per-task work in the | |
| setup() and work() methods of your own WorkerBase subclass. | |
| 2. To prepare the process pool, instantiate a WorkerPool, passing your | |
| subclass type as the first (worker) argument, as well as any setup keyword | |
| arguments. The WorkerPool will instantiate one of your workers in each | |
| worker process (passing in the setup arguments in those processes). | |
| If debugging, the pool can have process_count=0 to force all the work | |
| to be done immediately on the main thread; otherwise all the work | |
| will be passed to other processes. | |
| 3. Whenever there is a new piece of work to distribute, call pool.add(*args). | |
| The arguments will be queued and passed as worker.work(*args) to the | |
| next available worker. | |
| 4. When all the work has been distributed, call pool.join() to wait for all | |
| the work to complete and to finish and terminate all the worker processes. | |
| When pool.join() returns, all the work will have been done. | |
| No arrangement is made to collect the results of the work: for example, | |
| the return value of work() is ignored. If you need to collect the | |
| results, use your own mechanism (filesystem, shared memory object, queue) | |
| which can be distributed using setup arguments. | |
| ''' | |
| from multiprocessing import Process, Queue, cpu_count | |
| import signal | |
| import atexit | |
| import sys | |
| class WorkerBase(Process): | |
| ''' | |
| Subclass this class and override its work() method (and optionally, | |
| setup() as well) to define the units of work to be done in a process | |
| worker in a woker pool. | |
| ''' | |
| def __init__(self, i, process_count, queue, initargs): | |
| if process_count > 0: | |
| # Make sure we ignore ctrl-C if we are not on main process. | |
| signal.signal(signal.SIGINT, signal.SIG_IGN) | |
| self.process_id = i | |
| self.process_count = process_count | |
| self.queue = queue | |
| super(WorkerBase, self).__init__() | |
| self.setup(**initargs) | |
| def run(self): | |
| # Do the work until None is dequeued | |
| while True: | |
| try: | |
| work_batch = self.queue.get() | |
| except (KeyboardInterrupt, SystemExit): | |
| print('Exiting...') | |
| break | |
| if work_batch is None: | |
| self.queue.put(None) # for another worker | |
| return | |
| self.work(*work_batch) | |
| def setup(self, **initargs): | |
| ''' | |
| Override this method for any per-process initialization. | |
| Keywoard args are passed from WorkerPool constructor. | |
| ''' | |
| pass | |
| def work(self, *args): | |
| ''' | |
| Override this method for one-time initialization. | |
| Args are passed from WorkerPool.add() arguments. | |
| ''' | |
| raise NotImplementedError('worker subclass needed') | |
| class WorkerPool(object): | |
| ''' | |
| Instantiate this object (passing a WorkerBase subclass type | |
| as its first argument) to create a worker pool. Then call | |
| pool.add(*args) to queue args to distribute to worker.work(*args), | |
| and call pool.join() to wait for all the workers to complete. | |
| ''' | |
| def __init__(self, worker=WorkerBase, process_count=None, **initargs): | |
| global active_pools | |
| if process_count is None: | |
| process_count = cpu_count() | |
| if process_count == 0: | |
| # zero process_count uses only main process, for debugging. | |
| self.queue = None | |
| self.processes = None | |
| self.worker = worker(None, 0, None, initargs) | |
| return | |
| # Ctrl-C strategy: worker processes should ignore ctrl-C. Set | |
| # this up to be inherited by child processes before forking. | |
| original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) | |
| active_pools[id(self)] = self | |
| self.queue = Queue(maxsize=(process_count * 3)) | |
| self.processes = None # Initialize before trying to construct workers | |
| self.processes = [worker(i, process_count, self.queue, initargs) | |
| for i in range(process_count)] | |
| for p in self.processes: | |
| p.start() | |
| # The main process should handle ctrl-C. Restore this now. | |
| signal.signal(signal.SIGINT, original_sigint_handler) | |
| def add(self, *work_batch): | |
| if self.queue is None: | |
| if hasattr(self, 'worker'): | |
| self.worker.work(*work_batch) | |
| else: | |
| print('WorkerPool shutting down.', file=sys.stderr) | |
| else: | |
| try: | |
| # The queue can block if the work is so slow it gets full. | |
| self.queue.put(work_batch) | |
| except (KeyboardInterrupt, SystemExit): | |
| # Handle ctrl-C if done while waiting for the queue. | |
| self.early_terminate() | |
| def join(self): | |
| # End the queue, and wait for all worker processes to complete nicely. | |
| if self.queue is not None: | |
| self.queue.put(None) | |
| for p in self.processes: | |
| p.join() | |
| self.queue = None | |
| # Remove myself from the set of pools that need cleanup on shutdown. | |
| try: | |
| del active_pools[id(self)] | |
| except: | |
| pass | |
| def early_terminate(self): | |
| # When shutting down unexpectedly, first end the queue. | |
| if self.queue is not None: | |
| try: | |
| self.queue.put_nowait(None) # Nonblocking put throws if full. | |
| self.queue = None | |
| except: | |
| pass | |
| # But then don't wait: just forcibly terminate workers. | |
| if self.processes is not None: | |
| for p in self.processes: | |
| p.terminate() | |
| self.processes = None | |
| try: | |
| del active_pools[id(self)] | |
| except: | |
| pass | |
| def __del__(self): | |
| if self.queue is not None: | |
| print('ERROR: workerpool.join() not called!', file=sys.stderr) | |
| self.join() | |
| # Error and ctrl-C handling: kill worker processes if the main process ends. | |
| active_pools = {} | |
| def early_terminate_pools(): | |
| for _, pool in list(active_pools.items()): | |
| pool.early_terminate() | |
| atexit.register(early_terminate_pools) | |