compute.parallel
¶
Utilities for parallel computation.
-
pyphi.compute.parallel.
get_num_processes
()¶ Return the number of processes to use in parallel.
-
class
pyphi.compute.parallel.
ExceptionWrapper
(exception)¶ A picklable wrapper suitable for passing exception tracebacks through instances of
multiprocessing.Queue
.Parameters: exception (Exception) – The exception to wrap. -
reraise
()¶ Re-raise the exception.
-
-
class
pyphi.compute.parallel.
MapReduce
(iterable, *context)¶ An engine for doing heavy computations over an iterable.
This is similar to
multiprocessing.Pool
, but allows computations to shortcircuit, and supports both parallel and sequential computations.Parameters: - iterable (Iterable) – A collection of objects to perform a computation over.
- *context – Any additional data necessary to complete the computation.
Any subclass of
MapReduce
must implement three methods:- ``empty_result``, - ``compute``, (map), and - ``process_result`` (reduce).
The engine includes a builtin
tqdm
progress bar; this can be disabled by settingpyphi.config.PROGRESS_BARS
toFalse
.Parallel operations start a daemon thread which handles log messages sent from worker processes.
Subprocesses spawned by
MapReduce
cannot spawn more subprocesses; be aware of this when composing nested computations. This is not an issue in practice because it is typically most efficient to only parallelize the top level computation.-
description
= ''¶
-
empty_result
(*context)¶ Return the default result with which to begin the computation.
-
static
compute
(obj, *context)¶ Map over a single object from
self.iterable
.
-
process_result
(new_result, old_result)¶ Reduce handler.
Every time a new result is generated by
compute
, this method is called with the result and the previous (accumulated) result. This method compares or collates these two values, returning the new result.Setting
self.done
toTrue
in this method will abort the remainder of the computation, returning this final result.
-
init_progress_bar
()¶ Initialize and return a progress bar.
-
static
worker
(compute, task_queue, result_queue, log_queue, complete, *context)¶ A worker process, run by
multiprocessing.Process
.
-
start_parallel
()¶ Initialize all queues and start the worker processes and the log thread.
-
initialize_tasks
()¶ Load the input queue to capacity.
Overfilling causes a deadlock when queue.put blocks when full, so further tasks are enqueued as results are returned.
-
maybe_put_task
()¶ Enqueue the next task, if there are any waiting.
-
run_parallel
()¶ Perform the computation in parallel, reading results from the output queue and passing them to
process_result
.
-
finish_parallel
()¶ Orderly shutdown of workers.
-
run_sequential
()¶ Perform the computation sequentially, only holding two computed objects in memory at a time.
-
run
(parallel=True)¶ Perform the computation.
Keyword Arguments: parallel (boolean) – If True, run the computation in parallel. Otherwise, operate sequentially.
-
class
pyphi.compute.parallel.
LogThread
(q)¶ Thread which handles log records sent from
MapReduce
processes.It listens to an instance of
multiprocessing.Queue
, rewriting log messages to the PyPhi log handler.-
run
()¶ Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
-
-
pyphi.compute.parallel.
configure_worker_logging
(queue)¶ Configure a worker process to log all messages to
queue
.