"""Parameter container and validation for pclean.
.. deprecated::
``PcleanParams`` is deprecated. Use :class:`pclean.config.PcleanConfig`
with its ``to_casa_*()`` bridge methods instead. This module is retained
only for backward compatibility and will be removed in a future release.
"""
from __future__ import annotations
import copy
import warnings
from collections.abc import Sequence
# ---------------------------------------------------------------------------
# Defaults (matching tclean where possible)
# ---------------------------------------------------------------------------
_DEFAULT_SEL = dict(
msname='',
field='',
spw='',
timestr='',
uvdist='',
antenna='',
scan='',
obs='',
state='',
taql='',
datacolumn='corrected',
)
# Mapping from tclean user-facing parameter names to CASA internal names
_SEL_KEY_ALIASES: dict[str, str] = {
'timerange': 'timestr',
'uvrange': 'uvdist',
'observation': 'obs',
'intent': 'state',
}
_DEFAULT_IMG = dict(
imagename='',
imsize=[100],
cell=['1arcsec'],
phasecenter='',
stokes='I',
projection='SIN',
specmode='mfs',
reffreq='',
nchan=-1,
start='',
width='',
outframe='LSRK',
veltype='radio',
restfreq=[],
interpolation='linear',
perchanweightdensity=True,
startmodel='',
# These must be in impars so defineimage() creates the correct
# image products (e.g. .tt0/.tt1 for mtmfs). Matches CASA's
# ImagerParameters.allimpars.
nterms=2,
deconvolver='hogbom',
restart=True,
)
_DEFAULT_GRID = dict(
gridder='standard',
facets=1,
wprojplanes=1,
vptable='',
mosweight=True,
aterm=True,
psterm=False,
wbawp=True,
conjbeams=False,
cfcache='',
usepointing=False,
computepastep=360.0,
rotatepastep=360.0,
pointingoffsetsigdev=[],
pblimit=0.2,
normtype='flatnoise',
psfphasecenter='',
)
_DEFAULT_WEIGHT = dict(
type='natural',
rmode='none',
robust=0.5,
noise='1.0Jy',
npixels=0,
fieldofview='',
uvtaper=[],
multifield=False,
usecubebriggs=True,
)
_DEFAULT_DEC = dict(
deconvolver='hogbom',
scales=[],
nterms=2,
smallscalebias=0.0,
fusedthreshold=0.0,
largestscale=-1,
restoration=True,
restoringbeam=[],
pbcor=False,
usemask='user',
mask='',
pbmask=0.0,
sidelobethreshold=3.0,
noisethreshold=5.0,
lownoisethreshold=1.5,
negativethreshold=0.0,
smoothfactor=1.0,
minbeamfrac=0.3,
cutthreshold=0.01,
growiterations=100,
dogrowprune=True,
minpercentchange=0.0,
verbose=False,
fastnoise=True,
python_automask=True,
fullsummary=False,
)
_DEFAULT_NORM = dict(
pblimit=0.2,
normtype='flatnoise',
psfcutoff=0.35,
)
_DEFAULT_ITER = dict(
niter=0,
loopgain=0.1,
threshold='0.0mJy',
nsigma=0.0,
cycleniter=-1,
cyclefactor=1.0,
minpsffraction=0.05,
maxpsffraction=0.8,
interactive=False,
nmajor=-1,
fullsummary=False,
savemodel='none',
)
_DEFAULT_MISC = dict(
restart=True,
calcres=True,
calcpsf=True,
)
# ---------------------------------------------------------------------------
# Dask-specific defaults
# ---------------------------------------------------------------------------
_DEFAULT_PARALLEL = dict(
parallel=False,
nworkers=None, # auto-detect from available CPU cores
scheduler_address=None, # if set, connect to an existing Dask cluster
threads_per_worker=1,
memory_limit='0', # '0' disables Dask's per-worker memory limit
local_directory=None, # scratch directory for spilling; defaults to system tmpdir
cube_chunksize=-1, # -1: nparts=nworkers, 1: one channel per task, N: N channels per task
keep_subcubes=False, # retain per-worker subcube images after concatenation
keep_partimages=False, # retain partial images after continuum gather
# Cluster backend selection
cluster_type='local', # 'local' | 'slurm' | 'address'
# SLURM-specific (only used when cluster_type='slurm')
slurm_queue=None, # SLURM partition name (--partition)
slurm_account=None, # SLURM account (--account)
slurm_walltime='24:00:00', # per-job wall time (--time)
slurm_job_mem='20GB', # per-job memory (--mem)
slurm_cores_per_job=1, # CPUs per SLURM job (--cpus-per-task)
slurm_job_extra_directives=None, # list[str] of extra #SBATCH lines
slurm_python=None, # Python executable path on compute nodes
slurm_local_directory=None, # worker scratch dir on compute nodes
slurm_log_directory='logs', # SLURM stdout/stderr log directory
slurm_job_script_prologue=None, # list[str] of shell commands before worker start
)
_ALLOW_BRIGGS_BW_TAPER = True
# ---------------------------------------------------------------------------
# Helper
# ---------------------------------------------------------------------------
def _merge(defaults: dict, overrides: dict) -> dict:
"""Return *defaults* updated with non-None entries from *overrides*."""
out = dict(defaults)
for k, v in overrides.items():
if k in out:
out[k] = v
return out
def _ensure_list(val: object) -> list:
if isinstance(val, str):
return [val]
try:
return list(val)
except TypeError:
return [val]
# ---------------------------------------------------------------------------
# PcleanParams
# ---------------------------------------------------------------------------
[docs]
class PcleanParams:
"""Validated, serialisable parameter container for *pclean*.
Organises all tclean-compatible parameters into sub-dicts that map
directly to the ``casatools`` synthesis-tool APIs, plus Dask-specific
settings.
Args:
vis: Measurement set path(s).
**kwargs: Any parameter accepted by CASA ``tclean`` plus the extra
``nworkers``, ``scheduler_address``, ``threads_per_worker``,
``memory_limit``, and ``local_directory`` keys.
"""
def __init__(self, vis: str | Sequence[str] = '', **kwargs):
warnings.warn(
'PcleanParams is deprecated; use pclean.config.PcleanConfig '
'with its to_casa_*() bridge methods instead.',
DeprecationWarning,
stacklevel=2,
)
vis = _ensure_list(vis) if vis else ['']
# ---- selection (one dict per MS, keyed as 'ms0', 'ms1', ...) -----
self.allselpars: dict[str, dict] = {}
for idx, msname in enumerate(vis):
key = f'ms{idx}'
sel = dict(_DEFAULT_SEL)
sel['msname'] = msname
for k in _DEFAULT_SEL:
if k in kwargs:
sel[k] = kwargs[k]
# Also translate tclean user-facing aliases (e.g. timerange -> timestr)
for alias, internal in _SEL_KEY_ALIASES.items():
if alias in kwargs:
sel[internal] = kwargs[alias]
self.allselpars[key] = sel
# ---- image definition (field 0) ---------------------------------
self.allimpars: dict[str, dict] = {}
imp = _merge(_DEFAULT_IMG, kwargs)
imp['imsize'] = _ensure_list(imp['imsize'])
if len(imp['imsize']) == 1:
imp['imsize'] = imp['imsize'] * 2
imp['cell'] = _ensure_list(imp['cell'])
if len(imp['cell']) == 1:
imp['cell'] = imp['cell'] * 2
imp['restfreq'] = _ensure_list(imp.get('restfreq', []))
self.allimpars['0'] = imp
# ---- gridding ---------------------------------------------------
self.allgridpars: dict[str, dict] = {}
gp = _merge(_DEFAULT_GRID, kwargs)
# C++ SynthesisParamsGrid expects these keys in gridpars too
gp['imagename'] = imp['imagename']
gp['deconvolver'] = kwargs.get('deconvolver', 'hogbom')
gp['interpolation'] = imp.get('interpolation', 'linear')
self.allgridpars['0'] = gp
# ---- weighting ---------------------------------------------------
wp = _merge(_DEFAULT_WEIGHT, kwargs)
# tclean uses 'weighting' but casatools uses 'type'
weighting = kwargs.get('weighting', wp.get('type', 'natural'))
# Translate composite weighting names to C++ type + rmode
if weighting == 'briggsbwtaper':
wp['type'] = 'briggs'
wp['rmode'] = 'bwtaper'
if _ALLOW_BRIGGS_BW_TAPER:
# Pre-compute fractional bandwidth from full cube so each
# parallel sub-cube worker can apply the correct taper even
# when it images only a single channel (fracBW=0 fallback).
from pclean.utils.partition import _parse_freq_hz
start_hz = _parse_freq_hz(kwargs.get('start', ''))
width_hz = _parse_freq_hz(kwargs.get('width', ''))
nchan_full = kwargs.get('nchan', -1)
if start_hz is not None and width_hz is not None and nchan_full > 1:
min_freq = start_hz
max_freq = start_hz + (nchan_full - 1) * abs(width_hz)
if min_freq > max_freq:
min_freq, max_freq = max_freq, min_freq
wp['fracbw'] = 2.0 * (max_freq - min_freq) / (max_freq + min_freq)
else:
wp.pop('fracbw', None) # ensure fracbw is not set if bwtaper is disabled
elif weighting == 'briggsabs':
wp['type'] = 'briggs'
wp['rmode'] = 'abs'
elif weighting == 'briggs':
wp['type'] = 'briggs'
wp['rmode'] = 'norm'
else:
wp['type'] = weighting
wp.setdefault('rmode', 'none')
# mosweight → multifield flag expected by setweighting()
wp['multifield'] = kwargs.get('mosweight', False)
wp['usecubebriggs'] = kwargs.get('perchanweightdensity', True)
self.weightpars: dict = wp
# ---- deconvolution -----------------------------------------------
self.alldecpars: dict[str, dict] = {}
dp = _merge(_DEFAULT_DEC, kwargs)
dp['scales'] = _ensure_list(dp.get('scales', []))
dp['restoringbeam'] = _ensure_list(dp.get('restoringbeam', []))
# fullsummary must be consistent between iterbotsink and deconvolver
dp['fullsummary'] = kwargs.get('fullsummary', False)
self.alldecpars['0'] = dp
# ---- normalizer --------------------------------------------------
self.allnormpars: dict[str, dict] = {}
np_ = _merge(_DEFAULT_NORM, kwargs)
np_['imagename'] = imp['imagename']
np_['nterms'] = dp['nterms'] if dp['deconvolver'] == 'mtmfs' else 1
np_['deconvolver'] = dp['deconvolver']
np_['specmode'] = imp['specmode']
self.allnormpars['0'] = np_
# ---- iteration control -------------------------------------------
ip = _merge(_DEFAULT_ITER, kwargs)
# tclean uses 'gain' but iterbotsink expects 'loopgain'
if 'gain' in kwargs and 'loopgain' not in kwargs:
ip['loopgain'] = kwargs['gain']
# The C++ iterbotsink requires an 'allimages' sub-record
ip['allimages'] = {}
for fld in self.allimpars:
ip['allimages'][fld] = {
'imagename': self.allimpars[fld]['imagename'],
'multiterm': (self.alldecpars[fld]['deconvolver'] == 'mtmfs'),
}
self.iterpars: dict = ip
# ---- misc --------------------------------------------------------
self.miscpars: dict = _merge(_DEFAULT_MISC, kwargs)
# ---- dask parallel -----------------------------------------------
self.parallelpars: dict = _merge(_DEFAULT_PARALLEL, kwargs)
# ------------------------------------------------------------------
# Convenience accessors
# ------------------------------------------------------------------
@property
def specmode(self) -> str:
return self.allimpars['0']['specmode']
@property
def imagename(self) -> str:
return self.allimpars['0']['imagename']
@property
def parallel(self) -> bool:
return self.parallelpars.get('parallel', False)
@property
def niter(self) -> int:
return self.iterpars.get('niter', 0)
@property
def nfields(self) -> int:
return len(self.allimpars)
@property
def nms(self) -> int:
return len(self.allselpars)
@property
def deconvolver(self) -> str:
return self.alldecpars['0']['deconvolver']
@property
def is_cube(self) -> bool:
return self.specmode in ('cube', 'cubedata', 'cubesource')
@property
def is_mfs(self) -> bool:
return self.specmode == 'mfs'
# ------------------------------------------------------------------
# Serialization helpers (for Dask)
# ------------------------------------------------------------------
[docs]
def to_dict(self) -> dict:
"""Return a plain-dict snapshot that can be sent to a Dask worker."""
return dict(
allselpars=copy.deepcopy(self.allselpars),
allimpars=copy.deepcopy(self.allimpars),
allgridpars=copy.deepcopy(self.allgridpars),
weightpars=copy.deepcopy(self.weightpars),
alldecpars=copy.deepcopy(self.alldecpars),
allnormpars=copy.deepcopy(self.allnormpars),
iterpars=copy.deepcopy(self.iterpars),
miscpars=copy.deepcopy(self.miscpars),
parallelpars=copy.deepcopy(self.parallelpars),
)
[docs]
@classmethod
def from_dict(cls, d: dict) -> PcleanParams:
"""Re-hydrate from a plain dict (inverse of ``to_dict``)."""
obj = cls.__new__(cls)
obj.allselpars = d['allselpars']
obj.allimpars = d['allimpars']
obj.allgridpars = d['allgridpars']
obj.weightpars = d['weightpars']
obj.alldecpars = d['alldecpars']
obj.allnormpars = d['allnormpars']
obj.iterpars = d['iterpars']
obj.miscpars = d['miscpars']
obj.parallelpars = d['parallelpars']
return obj
[docs]
def clone(self) -> PcleanParams:
return PcleanParams.from_dict(self.to_dict())
# ------------------------------------------------------------------
# Partition helpers (used by parallel engines)
# ------------------------------------------------------------------
[docs]
def make_subcube_params(
self,
start: int | str,
nchan: int,
image_suffix: str,
) -> PcleanParams:
"""Return a copy tuned for a channel sub-range (cube parallelism).
Args:
start: Start channel (int) or frequency/velocity string for the subcube.
nchan: Number of channels in this subcube.
image_suffix: Suffix appended to the base imagename.
"""
p = self.clone()
imp = p.allimpars['0']
imp['nchan'] = nchan
imp['start'] = start if isinstance(start, str) else str(start)
imp['imagename'] = f'{self.imagename}.subcube.{image_suffix}'
# All param groups must track the new image name
p.allnormpars['0']['imagename'] = imp['imagename']
p.allgridpars['0']['imagename'] = imp['imagename']
if 'allimages' in p.iterpars:
p.iterpars['allimages']['0']['imagename'] = imp['imagename']
return p
[docs]
def make_rowchunk_params(
self,
partition_selpars: dict,
image_suffix: str,
) -> PcleanParams:
"""Return a copy with selection pars limited to a row chunk.
Used for continuum parallelism.
"""
p = self.clone()
p.allselpars = copy.deepcopy(partition_selpars)
imp = p.allimpars['0']
imp['imagename'] = f'{self.imagename}.part.{image_suffix}'
p.allnormpars['0']['imagename'] = imp['imagename']
p.allgridpars['0']['imagename'] = imp['imagename']
if 'allimages' in p.iterpars:
p.iterpars['allimages']['0']['imagename'] = imp['imagename']
return p