compute.parallel
¶
Utilities for parallel computation.
- pyphi.compute.parallel.get_num_processes()¶
Return the number of processes to use in parallel.
- class pyphi.compute.parallel.ExceptionWrapper(exception)¶
A picklable wrapper suitable for passing exception tracebacks through instances of
multiprocessing.Queue
.- Parameters
exception (Exception) – The exception to wrap.
- reraise()¶
Re-raise the exception.
- class pyphi.compute.parallel.MapReduce(iterable, *context)¶
An engine for doing heavy computations over an iterable.
This is similar to
multiprocessing.Pool
, but allows computations to shortcircuit, and supports both parallel and sequential computations.- Parameters
iterable (Iterable) – A collection of objects to perform a computation over.
*context – Any additional data necessary to complete the computation.
Any subclass of
MapReduce
must implement three methods:- ``empty_result``, - ``compute``, (map), and - ``process_result`` (reduce).
The engine includes a builtin
tqdm
progress bar; this can be disabled by settingpyphi.config.PROGRESS_BARS
toFalse
.Parallel operations start a daemon thread which handles log messages sent from worker processes.
Subprocesses spawned by
MapReduce
cannot spawn more subprocesses; be aware of this when composing nested computations. This is not an issue in practice because it is typically most efficient to only parallelize the top level computation.- description = ''¶
- empty_result(*context)¶
Return the default result with which to begin the computation.
- static compute(obj, *context)¶
Map over a single object from
self.iterable
.
- process_result(new_result, old_result)¶
Reduce handler.
Every time a new result is generated by
compute
, this method is called with the result and the previous (accumulated) result. This method compares or collates these two values, returning the new result.Setting
self.done
toTrue
in this method will abort the remainder of the computation, returning this final result.
- init_progress_bar()¶
Initialize and return a progress bar.
- static worker(compute, task_queue, result_queue, log_queue, complete, parent_config, *context)¶
A worker process, run by
multiprocessing.Process
.
- start_parallel()¶
Initialize all queues and start the worker processes and the log thread.
- initialize_tasks()¶
Load the input queue to capacity.
Overfilling causes a deadlock when queue.put blocks when full, so further tasks are enqueued as results are returned.
- maybe_put_task()¶
Enqueue the next task, if there are any waiting.
- run_parallel()¶
Perform the computation in parallel, reading results from the output queue and passing them to
process_result
.
- finish_parallel()¶
Orderly shutdown of workers.
- run_sequential()¶
Perform the computation sequentially, only holding two computed objects in memory at a time.
- run(parallel=True)¶
Perform the computation.
- Keyword Arguments
parallel (boolean) – If True, run the computation in parallel. Otherwise, operate sequentially.
- class pyphi.compute.parallel.LogThread(q)¶
Thread which handles log records sent from
MapReduce
processes.It listens to an instance of
multiprocessing.Queue
, rewriting log messages to the PyPhi log handler.This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.
args is the argument tuple for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
- run()¶
Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
- pyphi.compute.parallel.configure_worker_logging(queue)¶
Configure a worker process to log all messages to
queue
.