Parallel API

Cluster Management

Dask cluster lifecycle management.

Supports:

  • Starting a LocalCluster (default)

  • Submitting workers as SLURM batch jobs via dask_jobqueue.SLURMCluster

  • Connecting to an existing distributed.Client via scheduler address

  • Graceful shutdown with image cleanup

class pclean.parallel.cluster.DaskClusterManager(nworkers=None, scheduler_address=None, threads_per_worker=1, memory_limit='0', local_directory=None, cluster_type='local', slurm_queue=None, slurm_account=None, slurm_walltime='24:00:00', slurm_job_mem='20GB', slurm_cores_per_job=1, slurm_job_name=None, slurm_job_extra_directives=None, slurm_python=None, slurm_local_directory=None, slurm_log_directory='logs', slurm_job_script_prologue=None)[source]

Bases: object

Thin wrapper that owns a dask.distributed.Client.

Supports three cluster backends selected by cluster_type:

  • 'local' — spin up a dask.distributed.LocalCluster (default).

  • 'slurm' — submit workers as SLURM batch jobs via dask_jobqueue.SLURMCluster. Requires the optional dask-jobqueue package (pip install dask-jobqueue).

  • 'address' — connect to a pre-existing scheduler at scheduler_address.

For backward compatibility, if scheduler_address is set and cluster_type is left at 'local', the manager silently switches to 'address' mode.

Parameters:
  • nworkers (int | None) – Number of workers. None -> os.cpu_count().

  • scheduler_address (str | None) – Scheduler URL for 'address' mode.

  • threads_per_worker (int) – Threads per Dask worker (default 1 – CASA tools are not thread-safe).

  • memory_limit (str) – Per-worker memory limit. Default '0' disables Dask’s memory management, which is correct for CASA workloads because all heavy allocations happen inside C++ casatools (reported as “unmanaged memory”). Dask cannot free this memory, so its pause/spill heuristics only cause workers to stall. Concurrency is bounded by as_completed instead.

  • local_directory (str | None) – Scratch directory for Dask spill-to-disk.

  • cluster_type (str) – 'local', 'slurm', or 'address'.

  • slurm_queue (str | None) – SLURM partition name (--partition).

  • slurm_account (str | None) – SLURM account string (--account).

  • slurm_walltime (str) – Per-job wall time (--time).

  • slurm_job_mem (str) – Per-job memory (--mem).

  • slurm_cores_per_job (int) – CPUs per SLURM job (--cpus-per-task).

  • slurm_job_name (str | None) – SLURM job name (--job-name). Appears in squeue output under the NAME column, making workers easy to identify. Defaults to None (dask-jobqueue uses 'dask-worker').

  • slurm_job_extra_directives (list[str] | None) – Extra #SBATCH lines.

  • slurm_python (str | None) – Path to the Python executable on compute nodes.

  • slurm_local_directory (str | None) – Worker scratch directory on compute nodes.

  • slurm_log_directory (str) – Directory for SLURM stdout/stderr logs.

  • slurm_job_script_prologue (list[str] | None) – Shell commands injected before the worker process starts (e.g. module load or conda activate).

start()[source]

Start (or connect to) the Dask cluster and return self.

Return type:

DaskClusterManager

shutdown()[source]

Close client and cluster.

Return type:

None

property client

Return the dask.distributed.Client.

property worker_count: int

Number of workers currently registered with the scheduler.

Uses client.nthreads() which is a direct synchronous query to the scheduler, avoiding stale cached snapshots from scheduler_info().

Note that this can be less than the requested nworkers due to resource constraints or startup issues. The cluster manager will log a warning and adjust nworkers accordingly.

Cube Parallel Imager

Parallel cube imaging engine.

Distributes channels across Dask workers. Each worker runs a fully independent SerialImager on its sub-cube (imaging + deconvolution). After all workers finish, the coordinator concatenates the sub-cubes into the final output cube.

This is embarrassingly parallel – there is no inter-worker communication during imaging.

class pclean.parallel.cube_parallel.ParallelCubeImager(config, cluster)[source]

Bases: object

Channel-parallel cube CLEAN imager.

Parameters:
  • config (PcleanConfig) – Full imaging configuration (specmode must be cube/cubedata/cubesource).

  • cluster (DaskClusterManager) – Running Dask cluster.

run()[source]

Execute the full parallel cube pipeline.

Returns:

Per-subcube summary list and the final concatenated image name.

Return type:

dict

Continuum Parallel Imager

Parallel continuum (MFS) imaging engine.

Distributes visibility rows across Dask workers. Each worker runs its own synthesisimager on a data chunk to produce a partial image. The coordinator then uses synthesisnormalizer to gather partial images, normalize, run the (serial) minor cycle, and scatter the updated model back to workers for the next major cycle.

Parallelism pattern:

Major cycle (gridding / degridding) -- parallel across row chunks
Minor cycle (deconvolution)         -- serial on the gathered full image
class pclean.parallel.continuum_parallel.ParallelContinuumImager(config, cluster)[source]

Bases: object

Row-parallel continuum (MFS) CLEAN imager.

Parameters:
run()[source]

Execute the full parallel continuum pipeline.

Returns:

Convergence summary.

Return type:

dict

Worker Tasks

Dask worker task functions.

Each function in this module is a pure top-level function that can be serialised by Dask and executed on a remote worker. They accept plain dicts (serialised PcleanConfig or CASA-native bundles) in order to avoid pickle issues and instantiate casatools objects on the worker side.

Design rationale:

  • Workers must import casatools locally – the C++ tool objects cannot be pickled or transferred between processes.

  • All file I/O (images, MSes) uses shared-filesystem paths so the coordinator can later gather partial products.

pclean.parallel.worker_tasks.run_subcube(config_dict)[source]

Run a complete imaging + deconvolution pipeline on a frequency sub-cube.

Invoked as a Dask task in the cube-parallel engine.

Each worker operates in its own temporary directory so that CASA’s deterministic temp files (IMAGING_WEIGHT_*) do not collide across concurrent workers sharing the same filesystem.

Parameters:

config_dict (dict) – Serialised PcleanConfig (from .model_dump()).

Returns:

Summary with convergence info, image name, etc.

Return type:

dict

pclean.parallel.worker_tasks.make_partial_psf(bundle)[source]

Create a synthesisimager on the worker, compute a partial PSF.

Parameters:

bundle (dict) – CASA-native parameter bundle (from PcleanConfig.to_casa_bundle()).

Returns:

The partial image name.

Return type:

str

pclean.parallel.worker_tasks.run_partial_major_cycle(bundle, controls=None)[source]

Execute one major cycle on the worker’s data partition.

Parameters:
  • bundle (dict) – CASA-native parameter bundle.

  • controls (dict | None) – Iteration control record from iterbotsink.

Returns:

The partial image name.

Return type:

str

pclean.parallel.worker_tasks.make_partial_pb(bundle)[source]

Compute partial primary beam on the worker.

Parameters:

bundle (dict) – CASA-native parameter bundle.

Returns:

The partial image name.

Return type:

str