Parallel API¶
Cluster Management¶
Dask cluster lifecycle management.
Supports:
Starting a
LocalCluster(default)Submitting workers as SLURM batch jobs via
dask_jobqueue.SLURMClusterConnecting to an existing
distributed.Clientvia scheduler addressGraceful shutdown with image cleanup
- class pclean.parallel.cluster.DaskClusterManager(nworkers=None, scheduler_address=None, threads_per_worker=1, memory_limit='0', local_directory=None, cluster_type='local', slurm_queue=None, slurm_account=None, slurm_walltime='24:00:00', slurm_job_mem='20GB', slurm_cores_per_job=1, slurm_job_name=None, slurm_job_extra_directives=None, slurm_python=None, slurm_local_directory=None, slurm_log_directory='logs', slurm_job_script_prologue=None)[source]¶
Bases:
objectThin wrapper that owns a
dask.distributed.Client.Supports three cluster backends selected by cluster_type:
'local'— spin up adask.distributed.LocalCluster(default).'slurm'— submit workers as SLURM batch jobs viadask_jobqueue.SLURMCluster. Requires the optionaldask-jobqueuepackage (pip install dask-jobqueue).'address'— connect to a pre-existing scheduler at scheduler_address.
For backward compatibility, if scheduler_address is set and cluster_type is left at
'local', the manager silently switches to'address'mode.- Parameters:
nworkers (int | None) – Number of workers.
None->os.cpu_count().scheduler_address (str | None) – Scheduler URL for
'address'mode.threads_per_worker (int) – Threads per Dask worker (default 1 – CASA tools are not thread-safe).
memory_limit (str) – Per-worker memory limit. Default
'0'disables Dask’s memory management, which is correct for CASA workloads because all heavy allocations happen inside C++ casatools (reported as “unmanaged memory”). Dask cannot free this memory, so its pause/spill heuristics only cause workers to stall. Concurrency is bounded byas_completedinstead.local_directory (str | None) – Scratch directory for Dask spill-to-disk.
cluster_type (str) –
'local','slurm', or'address'.slurm_queue (str | None) – SLURM partition name (
--partition).slurm_account (str | None) – SLURM account string (
--account).slurm_walltime (str) – Per-job wall time (
--time).slurm_job_mem (str) – Per-job memory (
--mem).slurm_cores_per_job (int) – CPUs per SLURM job (
--cpus-per-task).slurm_job_name (str | None) – SLURM job name (
--job-name). Appears insqueueoutput under the NAME column, making workers easy to identify. Defaults toNone(dask-jobqueue uses'dask-worker').slurm_job_extra_directives (list[str] | None) – Extra
#SBATCHlines.slurm_python (str | None) – Path to the Python executable on compute nodes.
slurm_local_directory (str | None) – Worker scratch directory on compute nodes.
slurm_log_directory (str) – Directory for SLURM stdout/stderr logs.
slurm_job_script_prologue (list[str] | None) – Shell commands injected before the worker process starts (e.g.
module loadorconda activate).
- property client¶
Return the
dask.distributed.Client.
- property worker_count: int¶
Number of workers currently registered with the scheduler.
Uses
client.nthreads()which is a direct synchronous query to the scheduler, avoiding stale cached snapshots fromscheduler_info().Note that this can be less than the requested nworkers due to resource constraints or startup issues. The cluster manager will log a warning and adjust nworkers accordingly.
Cube Parallel Imager¶
Parallel cube imaging engine.
Distributes channels across Dask workers. Each worker runs a fully
independent SerialImager on its sub-cube (imaging + deconvolution).
After all workers finish, the coordinator concatenates the sub-cubes
into the final output cube.
This is embarrassingly parallel – there is no inter-worker communication during imaging.
- class pclean.parallel.cube_parallel.ParallelCubeImager(config, cluster)[source]¶
Bases:
objectChannel-parallel cube CLEAN imager.
- Parameters:
config (PcleanConfig) – Full imaging configuration (specmode must be cube/cubedata/cubesource).
cluster (DaskClusterManager) – Running Dask cluster.
Continuum Parallel Imager¶
Parallel continuum (MFS) imaging engine.
Distributes visibility rows across Dask workers. Each worker runs
its own synthesisimager on a data chunk to produce a partial image.
The coordinator then uses synthesisnormalizer to gather partial
images, normalize, run the (serial) minor cycle, and scatter the
updated model back to workers for the next major cycle.
Parallelism pattern:
Major cycle (gridding / degridding) -- parallel across row chunks
Minor cycle (deconvolution) -- serial on the gathered full image
- class pclean.parallel.continuum_parallel.ParallelContinuumImager(config, cluster)[source]¶
Bases:
objectRow-parallel continuum (MFS) CLEAN imager.
- Parameters:
config (PcleanConfig) – Full imaging configuration (specmode should be
'mfs').cluster (DaskClusterManager) – Running Dask cluster.
Worker Tasks¶
Dask worker task functions.
Each function in this module is a pure top-level function that can be
serialised by Dask and executed on a remote worker. They accept plain
dicts (serialised PcleanConfig or CASA-native bundles) in order to
avoid pickle issues and instantiate casatools objects on the worker
side.
Design rationale:
Workers must import
casatoolslocally – the C++ tool objects cannot be pickled or transferred between processes.All file I/O (images, MSes) uses shared-filesystem paths so the coordinator can later gather partial products.
- pclean.parallel.worker_tasks.run_subcube(config_dict)[source]¶
Run a complete imaging + deconvolution pipeline on a frequency sub-cube.
Invoked as a Dask task in the cube-parallel engine.
Each worker operates in its own temporary directory so that CASA’s deterministic temp files (
IMAGING_WEIGHT_*) do not collide across concurrent workers sharing the same filesystem.
- pclean.parallel.worker_tasks.make_partial_psf(bundle)[source]¶
Create a
synthesisimageron the worker, compute a partial PSF.