Source code for pclean.config

"""Hierarchical configuration with pydantic v2 models.

Provides a single-source-of-truth :class:`PcleanConfig` that can be
built from:

* Direct Python construction (``PcleanConfig(image=ImageConfig(...))``).
* A YAML file (``PcleanConfig.from_yaml('config.yaml')``).
* Layered composition (``PcleanConfig.merge(base, overlay)``).
* The flat ``pclean()`` kwargs for backward compatibility
  (``PcleanConfig.from_flat_kwargs(...)``).

The resulting config can be converted to CASA-native dicts via the
``to_casa_*()`` bridge methods, or to the legacy ``PcleanParams`` via
:meth:`PcleanConfig.to_params` (deprecated).
"""

from __future__ import annotations

import copy
import logging
from pathlib import Path
from typing import Any, Literal

from pydantic import BaseModel, Field, field_validator

log = logging.getLogger(__name__)

# ======================================================================
# Sub-config models
# ======================================================================


[docs] class SelectionConfig(BaseModel): """Data selection parameters.""" vis: str | list[str] = '' @field_validator('vis', mode='before') @classmethod def _coerce_vis(cls, v: Any) -> str | list[str]: """Accept ``Path`` objects and coerce to ``str``.""" if isinstance(v, Path): return str(v) if isinstance(v, (list, tuple)): return [str(item) for item in v] return v field: str | list[str] = '' spw: str | list[str] = '' timerange: str | list[str] = '' uvrange: str | list[str] = '' antenna: str | list[str] = '' scan: str | list[str] = '' observation: str = '' intent: str = '' datacolumn: str = 'corrected'
[docs] class ImageConfig(BaseModel): """Image definition parameters.""" imagename: str = '' imsize: list[int] = Field(default_factory=lambda: [100]) cell: list[str] | str = '1arcsec' phasecenter: str = '' stokes: str = 'I' projection: str = 'SIN' startmodel: str = '' specmode: str = 'mfs' reffreq: str = '' nchan: int = -1 start: str | int = '' width: str | int = '' outframe: str = 'LSRK' veltype: str = 'radio' restfreq: list[str] = Field(default_factory=list) interpolation: str = 'linear' perchanweightdensity: bool = True nterms: int = 2
[docs] class GridConfig(BaseModel): """Gridding parameters.""" gridder: str = 'standard' facets: int = 1 wprojplanes: int = 1 vptable: str = '' mosweight: bool = True aterm: bool = True psterm: bool = False wbawp: bool = True conjbeams: bool = False cfcache: str = '' usepointing: bool = False computepastep: float = 360.0 rotatepastep: float = 360.0 pointingoffsetsigdev: list[float] = Field(default_factory=list) pblimit: float = 0.2 normtype: str = 'flatnoise' psfphasecenter: str = ''
[docs] class WeightConfig(BaseModel): """Weighting parameters.""" weighting: str = 'natural' robust: float = 0.5 noise: str = '1.0Jy' npixels: int = 0 uvtaper: list[str] = Field(default_factory=list) fracbw: float | None = None # pre-computed fractional bandwidth for briggsbwtaper
[docs] class DeconvolutionConfig(BaseModel): """Deconvolution and masking parameters.""" deconvolver: str = 'hogbom' scales: list[int] = Field(default_factory=list) nterms: int = 2 smallscalebias: float = 0.0 fusedthreshold: float = 0.0 largestscale: int = -1 restoration: bool = True restoringbeam: list[str] | str = Field(default_factory=list) pbcor: bool = False @field_validator('restoringbeam', mode='before') @classmethod def _coerce_restoringbeam(cls, v: Any) -> list[str]: if v is None: return [] if isinstance(v, str): return [v] return list(v) # Masking usemask: str = 'user' mask: str = '' pbmask: float = 0.0 sidelobethreshold: float = 3.0 noisethreshold: float = 5.0 lownoisethreshold: float = 1.5 negativethreshold: float = 0.0 smoothfactor: float = 1.0 minbeamfrac: float = 0.3 cutthreshold: float = 0.01 growiterations: int = 100 dogrowprune: bool = True minpercentchange: float = 0.0 verbose: bool = False fastnoise: bool = True # When True and usemask='auto-multithresh', run the automasking # algorithm in pure Python (numpy/scipy) instead of delegating to # the C++ SDMaskHandler. This avoids repeated full-cube I/O, # per-plane TempImage allocations, and casacore table-cache bugs. python_automask: bool = True
[docs] class IterationConfig(BaseModel): """Iteration control parameters.""" niter: int = 0 gain: float = 0.1 threshold: str = '0.0mJy' nsigma: float = 0.0 cycleniter: int = -1 cyclefactor: float = 1.0 minpsffraction: float = 0.05 maxpsffraction: float = 0.8 interactive: bool = False nmajor: int = -1 fullsummary: bool = False
[docs] class MiscConfig(BaseModel): """Miscellaneous parameters.""" restart: bool = True savemodel: str = 'none' calcres: bool = True calcpsf: bool = True psfcutoff: float = 0.35
[docs] class NormConfig(BaseModel): """Normalization parameters.""" pblimit: float = 0.2 normtype: str = 'flatnoise' psfcutoff: float = 0.35
[docs] class SlurmConfig(BaseModel): """SLURM batch-job parameters (used when ``cluster.type == 'slurm'``).""" queue: str | None = None account: str | None = None walltime: str = '24:00:00' job_mem: str = '20GB' cores_per_job: int = 1 job_name: str | None = None job_extra_directives: list[str] = Field(default_factory=list) python: str | None = None local_directory: str | None = None log_directory: str = 'logs' job_script_prologue: list[str] = Field(default_factory=list) @field_validator('job_extra_directives', 'job_script_prologue', mode='before') @classmethod def _coerce_none_to_list(cls, v: Any) -> list[str]: if v is None: return [] return list(v)
[docs] class SubmitConfig(BaseModel): """SLURM coordinator (submit) job parameters. These control the *coordinator* SLURM job that runs ``python -m pclean --pconfig <yaml>`` and lets dask-jobqueue spawn per-channel worker jobs. """ workdir: str | None = None pixi_project_dir: str | None = None pixi_env: str = 'forge' coordinator_mem: str = '8G' coordinator_cpus: int = 2 coordinator_walltime: str = '24:00:00' coordinator_job_name: str = 'pclean-coordinator' extra_sbatch: list[str] = Field(default_factory=list) log_dir: str | None = None psrecord: bool = True @field_validator('extra_sbatch', mode='before') @classmethod def _coerce_none_to_list(cls, v: Any) -> list[str]: if v is None: return [] return list(v)
[docs] class ClusterConfig(BaseModel): """Dask cluster configuration.""" type: Literal['local', 'slurm', 'address'] = 'local' nworkers: int | None = None scheduler_address: str | None = None threads_per_worker: int = 1 memory_limit: str = '0' local_directory: str | None = None parallel: bool = False cube_chunksize: int = -1 keep_subcubes: bool = False keep_partimages: bool = False concat_mode: Literal['auto', 'paged', 'virtual', 'movevirtual'] = 'auto' slurm: SlurmConfig = Field(default_factory=SlurmConfig) submit: SubmitConfig = Field(default_factory=SubmitConfig)
# ====================================================================== # Top-level config # ======================================================================
[docs] class PcleanConfig(BaseModel): """Top-level hierarchical configuration for pclean. All parameters are grouped into logical sub-configs. This is the single source of truth for the application. """ selection: SelectionConfig = Field(default_factory=SelectionConfig) image: ImageConfig = Field(default_factory=ImageConfig) grid: GridConfig = Field(default_factory=GridConfig) weight: WeightConfig = Field(default_factory=WeightConfig) deconvolution: DeconvolutionConfig = Field(default_factory=DeconvolutionConfig) iteration: IterationConfig = Field(default_factory=IterationConfig) normalization: NormConfig = Field(default_factory=NormConfig) misc: MiscConfig = Field(default_factory=MiscConfig) cluster: ClusterConfig = Field(default_factory=ClusterConfig) # ------------------------------------------------------------------ # YAML I/O # ------------------------------------------------------------------
[docs] @classmethod def from_yaml(cls, path: str | Path) -> PcleanConfig: """Load a config from a YAML file. Args: path: Path to the YAML file. """ import yaml p = Path(path) log.info('Loading config from %s', p) with p.open() as fh: data = yaml.safe_load(fh) or {} return cls.model_validate(data)
[docs] def to_yaml(self, path: str | Path) -> None: """Dump the config to a YAML file. Args: path: Destination file path. """ import yaml p = Path(path) p.parent.mkdir(parents=True, exist_ok=True) data = self.model_dump(mode='python') with p.open('w') as fh: yaml.dump(data, fh, default_flow_style=False, sort_keys=False) log.info('Config written to %s', p)
# ------------------------------------------------------------------ # Merge / composition # ------------------------------------------------------------------
[docs] @classmethod def merge(cls, *configs: PcleanConfig) -> PcleanConfig: """Deep-merge multiple configs, later values override earlier ones. Args: configs: Two or more configs to merge left-to-right. """ if not configs: return cls() merged = configs[0].model_dump(mode='python') for cfg in configs[1:]: overlay = cfg.model_dump(mode='python', exclude_defaults=True) _deep_update(merged, overlay) return cls.model_validate(merged)
# ------------------------------------------------------------------ # Build from flat kwargs (backward compat with pclean() signature) # ------------------------------------------------------------------
[docs] @classmethod def from_flat_kwargs( cls, vis: str | list[str] = '', **kwargs: Any, ) -> PcleanConfig: """Build a ``PcleanConfig`` from the flat ``pclean()`` keyword arguments. This is the backward-compatibility shim that maps the 80+ flat keyword arguments into the hierarchical structure. Args: vis: Measurement set path(s). **kwargs: Flat keyword arguments matching the ``pclean()`` signature. """ sel = dict(vis=vis) img: dict[str, Any] = {} grd: dict[str, Any] = {} wgt: dict[str, Any] = {} dec: dict[str, Any] = {} itr: dict[str, Any] = {} msc: dict[str, Any] = {} nrm: dict[str, Any] = {} clu: dict[str, Any] = {} slm: dict[str, Any] = {} # Selection fields _sel_keys = { 'field', 'spw', 'timerange', 'uvrange', 'antenna', 'scan', 'observation', 'intent', 'datacolumn', } # Image fields _img_keys = { 'imagename', 'imsize', 'cell', 'phasecenter', 'stokes', 'projection', 'startmodel', 'specmode', 'reffreq', 'nchan', 'start', 'width', 'outframe', 'veltype', 'restfreq', 'interpolation', 'perchanweightdensity', 'nterms', } # Grid fields _grd_keys = { 'gridder', 'facets', 'wprojplanes', 'vptable', 'mosweight', 'aterm', 'psterm', 'wbawp', 'conjbeams', 'cfcache', 'usepointing', 'computepastep', 'rotatepastep', 'pointingoffsetsigdev', 'pblimit', 'normtype', 'psfphasecenter', } # Weight fields _wgt_keys = {'weighting', 'robust', 'noise', 'npixels', 'uvtaper'} # Deconvolution fields _dec_keys = { 'deconvolver', 'scales', 'smallscalebias', 'fusedthreshold', 'largestscale', 'restoration', 'restoringbeam', 'pbcor', 'usemask', 'mask', 'pbmask', 'sidelobethreshold', 'noisethreshold', 'lownoisethreshold', 'negativethreshold', 'smoothfactor', 'minbeamfrac', 'cutthreshold', 'growiterations', 'dogrowprune', 'minpercentchange', 'verbose', 'fastnoise', 'python_automask', } # Note: nterms appears in both image and deconvolution _dec_keys.add('nterms') # Iteration fields _itr_keys = { 'niter', 'gain', 'threshold', 'nsigma', 'cycleniter', 'cyclefactor', 'minpsffraction', 'maxpsffraction', 'interactive', 'nmajor', 'fullsummary', } # Misc fields _msc_keys = {'restart', 'savemodel', 'calcres', 'calcpsf', 'psfcutoff'} # Norm fields _nrm_keys = {'pblimit', 'normtype', 'psfcutoff'} # Cluster flat keys -> structured _clu_keys = { 'parallel', 'nworkers', 'scheduler_address', 'threads_per_worker', 'memory_limit', 'local_directory', 'cube_chunksize', 'keep_subcubes', 'keep_partimages', 'concat_mode', } # Cluster type _clu_type_key = 'cluster_type' # SLURM flat keys (slurm_* prefix -> nested slurm.*) _slurm_prefix = 'slurm_' # Submit/coordinator flat keys (coordinator_* prefix -> nested submit.*) _coordinator_prefix = 'coordinator_' # Submit flat keys that don't use the coordinator_ prefix _submit_keys = {'pixi_project_dir', 'pixi_env', 'psrecord', 'workdir', 'log_dir'} # Submit extra_sbatch _submit_extra_sbatch_key = 'extra_sbatch' sub: dict[str, Any] = {} for k, v in kwargs.items(): if k in _sel_keys: sel[k] = v if k in _img_keys: img[k] = v if k in _grd_keys: grd[k] = v if k in _wgt_keys: wgt[k] = v if k in _dec_keys: dec[k] = v if k in _itr_keys: itr[k] = v if k in _msc_keys: msc[k] = v if k in _nrm_keys: nrm[k] = v if k in _clu_keys: clu[k] = v if k == _clu_type_key: clu['type'] = v if k.startswith(_slurm_prefix): slurm_field = k[len(_slurm_prefix):] slm[slurm_field] = v if k.startswith(_coordinator_prefix): sub[k] = v if k in _submit_keys or k in {'workdir', 'log_dir'}: sub[k] = v if k == _submit_extra_sbatch_key: sub[k] = v if slm: clu['slurm'] = slm if sub: clu['submit'] = sub data: dict[str, Any] = {} if sel: data['selection'] = sel if img: data['image'] = img if grd: data['grid'] = grd if wgt: data['weight'] = wgt if dec: data['deconvolution'] = dec if itr: data['iteration'] = itr if msc: data['misc'] = msc if nrm: data['normalization'] = nrm if clu: data['cluster'] = clu return cls.model_validate(data)
# ------------------------------------------------------------------ # Convert to flat kwargs (inverse of from_flat_kwargs) # ------------------------------------------------------------------
[docs] def to_flat_kwargs(self) -> dict[str, Any]: """Convert back to the flat keyword dict accepted by ``pclean()``. Returns: Flat dictionary with all parameters. """ kw: dict[str, Any] = {} # Selection sel = self.selection.model_dump() vis = sel.pop('vis', '') kw['vis'] = vis kw.update(sel) # Image kw.update(self.image.model_dump()) # Grid kw.update(self.grid.model_dump()) # Weight kw.update(self.weight.model_dump()) # Deconvolution kw.update(self.deconvolution.model_dump()) # Iteration kw.update(self.iteration.model_dump()) # Misc kw.update(self.misc.model_dump()) # Cluster -> flat clu = self.cluster.model_dump() slurm = clu.pop('slurm', {}) submit = clu.pop('submit', {}) clu_type = clu.pop('type', 'local') kw['cluster_type'] = clu_type kw.update(clu) for sk, sv in slurm.items(): kw[f'slurm_{sk}'] = sv for sk, sv in submit.items(): kw[sk] = sv return kw
# ------------------------------------------------------------------ # CASA-native dict builders # ------------------------------------------------------------------
[docs] def to_casa_selpars(self) -> dict[str, dict]: """Build CASA ``selectdata``-compatible multi-MS selection dicts. Returns: Dict keyed ``'ms0'``, ``'ms1'``, etc. with CASA-internal field names. """ sel = self.selection vis = sel.vis vis_list = [vis] if isinstance(vis, str) else list(vis) if not vis_list: vis_list = [''] def _expand(name: str, value: str | list[str]) -> list[str]: """Broadcast a scalar or validate a per-MS list.""" if isinstance(value, str): return [value] * len(vis_list) lst = list(value) if len(lst) != len(vis_list): raise ValueError( f'{name} list length ({len(lst)}) must match ' f'vis list length ({len(vis_list)})' ) return lst field_list = _expand('field', sel.field) spw_list = _expand('spw', sel.spw) timerange_list = _expand('timerange', sel.timerange) uvrange_list = _expand('uvrange', sel.uvrange) antenna_list = _expand('antenna', sel.antenna) scan_list = _expand('scan', sel.scan) result: dict[str, dict] = {} for idx, msname in enumerate(vis_list): result[f'ms{idx}'] = { 'msname': msname, 'field': field_list[idx], 'spw': spw_list[idx], 'timestr': timerange_list[idx], 'uvdist': uvrange_list[idx], 'antenna': antenna_list[idx], 'scan': scan_list[idx], 'obs': sel.observation, 'state': sel.intent, 'taql': '', 'datacolumn': sel.datacolumn, } return result
[docs] def to_casa_impars(self) -> dict[str, dict]: """Build CASA ``defineimage``-compatible image parameter dicts. Returns: Dict keyed ``'0'`` (single field) with image parameters. """ img = self.image imsize = list(img.imsize) if len(imsize) == 1: imsize = imsize * 2 cell = [img.cell] if isinstance(img.cell, str) else list(img.cell) if len(cell) == 1: cell = cell * 2 restfreq = list(img.restfreq) if img.restfreq else [] return { '0': { 'imagename': img.imagename, 'imsize': imsize, 'cell': cell, 'phasecenter': img.phasecenter, 'stokes': img.stokes, 'projection': img.projection, 'specmode': img.specmode, 'reffreq': img.reffreq, 'nchan': img.nchan, 'start': img.start, 'width': img.width, 'outframe': img.outframe, 'veltype': img.veltype, 'restfreq': restfreq, 'interpolation': img.interpolation, 'perchanweightdensity': img.perchanweightdensity, 'startmodel': img.startmodel, 'nterms': img.nterms, 'deconvolver': self.deconvolution.deconvolver, 'restart': self.misc.restart, }, }
[docs] def to_casa_gridpars(self) -> dict[str, dict]: """Build CASA ``defineimage`` grid parameter dicts.""" grd = self.grid return { '0': { 'gridder': grd.gridder, 'facets': grd.facets, 'wprojplanes': grd.wprojplanes, 'vptable': grd.vptable, 'mosweight': grd.mosweight, 'aterm': grd.aterm, 'psterm': grd.psterm, 'wbawp': grd.wbawp, 'conjbeams': grd.conjbeams, 'cfcache': grd.cfcache, 'usepointing': grd.usepointing, 'computepastep': grd.computepastep, 'rotatepastep': grd.rotatepastep, 'pointingoffsetsigdev': list(grd.pointingoffsetsigdev), 'pblimit': grd.pblimit, 'normtype': grd.normtype, 'psfphasecenter': grd.psfphasecenter, 'imagename': self.image.imagename, 'deconvolver': self.deconvolution.deconvolver, 'interpolation': self.image.interpolation, }, }
[docs] def to_casa_weightpars(self) -> dict: """Build CASA ``setweighting``-compatible weight parameter dict.""" wgt = self.weight weighting = wgt.weighting wp: dict[str, Any] = { 'robust': wgt.robust, 'noise': wgt.noise, 'npixels': wgt.npixels, 'fieldofview': '', 'uvtaper': list(wgt.uvtaper), } if weighting == 'briggsbwtaper': wp['type'] = 'briggs' wp['rmode'] = 'bwtaper' # Use pre-computed fracbw if available (e.g. inherited from # the parent config when this is a sub-cube). Otherwise # compute it from the current image start/width/nchan. if wgt.fracbw is not None and wgt.fracbw > 0: wp['fracbw'] = wgt.fracbw else: from pclean.utils.partition import _parse_freq_hz start_hz = _parse_freq_hz(self.image.start) width_hz = _parse_freq_hz(self.image.width) nchan_full = self.image.nchan if start_hz is not None and width_hz is not None and nchan_full > 1: min_freq = start_hz max_freq = start_hz + (nchan_full - 1) * abs(width_hz) if min_freq > max_freq: min_freq, max_freq = max_freq, min_freq wp['fracbw'] = 2.0 * (max_freq - min_freq) / (max_freq + min_freq) elif weighting == 'briggsabs': wp['type'] = 'briggs' wp['rmode'] = 'abs' elif weighting == 'briggs': wp['type'] = 'briggs' wp['rmode'] = 'norm' else: wp['type'] = weighting wp['rmode'] = 'none' wp['multifield'] = self.grid.mosweight wp['usecubebriggs'] = self.image.perchanweightdensity return wp
[docs] def to_casa_decpars(self) -> dict[str, dict]: """Build CASA ``setupdeconvolution``-compatible deconvolution dicts.""" dec = self.deconvolution # When Python automasking is active, tell C++ the mask mode is # 'user' so setupmask() won't run its own auto-multithresh. # The Python layer writes the mask to <imagename>.mask and C++ # picks it up as a user-supplied mask. cpp_usemask = dec.usemask if dec.python_automask and dec.usemask == 'auto-multithresh': cpp_usemask = 'user' return { '0': { 'deconvolver': dec.deconvolver, 'scales': list(dec.scales), 'nterms': dec.nterms, 'smallscalebias': dec.smallscalebias, 'fusedthreshold': dec.fusedthreshold, 'largestscale': dec.largestscale, 'restoration': dec.restoration, 'restoringbeam': list(dec.restoringbeam), 'pbcor': dec.pbcor, 'usemask': cpp_usemask, 'mask': dec.mask, 'pbmask': dec.pbmask, 'sidelobethreshold': dec.sidelobethreshold, 'noisethreshold': dec.noisethreshold, 'lownoisethreshold': dec.lownoisethreshold, 'negativethreshold': dec.negativethreshold, 'smoothfactor': dec.smoothfactor, 'minbeamfrac': dec.minbeamfrac, 'cutthreshold': dec.cutthreshold, 'growiterations': dec.growiterations, 'dogrowprune': dec.dogrowprune, 'minpercentchange': dec.minpercentchange, 'verbose': dec.verbose, 'fastnoise': dec.fastnoise, 'fullsummary': self.iteration.fullsummary, }, }
[docs] def to_casa_normpars(self) -> dict[str, dict]: """Build CASA ``setupnormalizer``-compatible normalization dicts.""" nrm = self.normalization dec = self.deconvolution nterms = dec.nterms if dec.deconvolver == 'mtmfs' else 1 return { '0': { 'pblimit': nrm.pblimit, 'normtype': nrm.normtype, 'psfcutoff': nrm.psfcutoff, 'imagename': self.image.imagename, 'nterms': nterms, 'deconvolver': dec.deconvolver, 'specmode': self.image.specmode, }, }
[docs] def to_casa_iterpars(self) -> dict: """Build CASA ``setupiteration``-compatible iteration parameter dict.""" itr = self.iteration return { 'niter': itr.niter, 'loopgain': itr.gain, 'threshold': itr.threshold, 'nsigma': itr.nsigma, 'cycleniter': itr.cycleniter, 'cyclefactor': itr.cyclefactor, 'minpsffraction': itr.minpsffraction, 'maxpsffraction': itr.maxpsffraction, 'interactive': itr.interactive, 'nmajor': itr.nmajor, 'fullsummary': itr.fullsummary, 'savemodel': self.misc.savemodel, 'allimages': { '0': { 'imagename': self.image.imagename, 'multiterm': (self.deconvolution.deconvolver == 'mtmfs'), }, }, }
[docs] def to_casa_miscpars(self) -> dict: """Build miscellaneous parameter dict.""" return { 'restart': self.misc.restart, 'calcres': self.misc.calcres, 'calcpsf': self.misc.calcpsf, }
[docs] def to_casa_bundle(self) -> dict: """Produce a serializable dict of all CASA-native parameter dicts. This is the worker-facing payload for continuum-parallel actors. Cube-parallel workers receive a serialized ``PcleanConfig`` instead. Returns: Dict with keys matching the former ``PcleanParams.to_dict()`` format. """ return { 'allselpars': self.to_casa_selpars(), 'allimpars': self.to_casa_impars(), 'allgridpars': self.to_casa_gridpars(), 'weightpars': self.to_casa_weightpars(), 'alldecpars': self.to_casa_decpars(), 'allnormpars': self.to_casa_normpars(), 'iterpars': self.to_casa_iterpars(), 'miscpars': self.to_casa_miscpars(), }
# ------------------------------------------------------------------ # Convenience properties # ------------------------------------------------------------------ @property def specmode(self) -> str: """Spectral mode (``'mfs'``, ``'cube'``, etc.).""" return self.image.specmode @property def imagename(self) -> str: """Output image name prefix.""" return self.image.imagename @property def parallel(self) -> bool: """Whether Dask parallelism is enabled.""" return self.cluster.parallel @property def niter(self) -> int: """Maximum number of CLEAN iterations.""" return self.iteration.niter @property def is_cube(self) -> bool: """True if specmode indicates cube imaging.""" return self.image.specmode in ('cube', 'cubedata', 'cubesource') @property def is_mfs(self) -> bool: """True if specmode is multi-frequency synthesis.""" return self.image.specmode == 'mfs' @property def nfields(self) -> int: """Number of image fields (currently always 1).""" return 1 @property def nms(self) -> int: """Number of measurement sets.""" vis = self.selection.vis if isinstance(vis, str): return 1 if vis else 0 return len(vis) # ------------------------------------------------------------------ # Partition helpers # ------------------------------------------------------------------
[docs] def make_subcube_config( self, start: int | str, nchan: int, image_suffix: str, width: int | str | None = None, ) -> PcleanConfig: """Return a copy tuned for a channel sub-range (cube parallelism). Args: start: Start channel (int) or frequency/velocity string. nchan: Number of channels in this subcube. image_suffix: Suffix appended to the base imagename. width: Channel width override. When *start* is converted to a frequency string the caller must also supply a matching frequency *width* so that CASA does not reject mixed unit types. """ data = self.model_dump(mode='python') data['image']['nchan'] = nchan data['image']['start'] = start if isinstance(start, str) else str(start) if width is not None: data['image']['width'] = width if isinstance(width, str) else str(width) data['image']['imagename'] = f'{self.imagename}.subcube.{image_suffix}' # Pre-compute fracbw from the *parent* (full-cube) config so that # subcubes with nchan=1 still get the correct fractional bandwidth # for briggsbwtaper weighting. if self.weight.weighting == 'briggsbwtaper' and self.weight.fracbw is None: parent_wp = self.to_casa_weightpars() if 'fracbw' in parent_wp: data['weight']['fracbw'] = parent_wp['fracbw'] return PcleanConfig.model_validate(data)
# ------------------------------------------------------------------ # Deprecated bridge to PcleanParams # ------------------------------------------------------------------
[docs] def to_params(self) -> Any: """Convert to the legacy ``PcleanParams`` used by engines. .. deprecated:: Use the ``to_casa_*()`` methods or pass ``PcleanConfig`` directly to engines instead. Returns: A fully constructed ``PcleanParams`` instance. """ import warnings from pclean.params import PcleanParams warnings.warn( 'PcleanConfig.to_params() is deprecated; use to_casa_*() methods ' 'or pass PcleanConfig directly to engines.', DeprecationWarning, stacklevel=2, ) kw = self.to_flat_kwargs() vis = kw.pop('vis', '') if isinstance(vis, str): vis = [vis] if vis else [''] return PcleanParams(vis=vis, **kw)
# ====================================================================== # Helpers # ====================================================================== def _deep_update(base: dict, overlay: dict) -> dict: """Recursively update *base* with values from *overlay* (in-place). Args: base: Base dictionary to update. overlay: Dictionary whose values override *base*. Returns: The mutated *base* dictionary. """ for k, v in overlay.items(): if isinstance(v, dict) and isinstance(base.get(k), dict): _deep_update(base[k], v) else: base[k] = copy.deepcopy(v) return base def _read_package_text(rel_path: str) -> str | None: """Read text content of a file bundled inside the ``pclean`` package. Uses :meth:`importlib.resources.Traversable.read_text`, which works correctly in both editable installs and zip-packaged distributions without requiring a temporary filesystem extraction. Args: rel_path: Path relative to the ``pclean`` package root (e.g. ``'configs/presets/vlass.yaml'``). Returns: File contents as a :class:`str`, or ``None`` if not found. """ try: from importlib.resources import files return files('pclean').joinpath(rel_path).read_text(encoding='utf-8') except (FileNotFoundError, TypeError): return None
[docs] def load_defaults() -> PcleanConfig: """Load the bundled ``defaults.yaml`` reference snapshot. This is equivalent to ``PcleanConfig()`` (all pydantic defaults) but read from the packaged YAML file for verification purposes. Returns: A ``PcleanConfig`` with the reference default values. """ import yaml text = _read_package_text('configs/defaults.yaml') if text is not None: log.info('Loading bundled defaults from package resource') return PcleanConfig.model_validate(yaml.safe_load(text) or {}) # Fallback: CWD cwd_path = Path('configs') / 'defaults.yaml' if cwd_path.exists(): return PcleanConfig.from_yaml(cwd_path) log.warning('defaults.yaml not found; returning pydantic defaults') return PcleanConfig()
[docs] def load_preset(name: str) -> PcleanConfig: """Load a named preset from the bundled ``configs/presets/`` directory. Searches first inside the installed package, then falls back to CWD-relative paths. Args: name: Preset name (without ``.yaml`` extension). """ import yaml # 1. Packaged preset — read text directly; works in editable + zip installs text = _read_package_text(f'configs/presets/{name}.yaml') if text is not None: log.info('Loading preset %r from package resource', name) return PcleanConfig.model_validate(yaml.safe_load(text) or {}) # 2. CWD fallback paths candidates = [ Path('configs') / 'presets' / f'{name}.yaml', Path(f'{name}.yaml'), ] for p in candidates: if p.exists(): log.info('Loading preset %s from %s', name, p) return PcleanConfig.from_yaml(p) searched = ['<package>/configs/presets/'] + [str(c) for c in candidates] raise FileNotFoundError(f'Preset {name!r} not found; searched: {", ".join(searched)}')
[docs] def get_adios2_config_path() -> Path | None: """Return the filesystem path of the bundled ADIOS2 XML config. The file enables lossless blosc2/zstd compression and sets sensible buffer sizes for CASA Measurement Sets stored in ADIOS2 format. Point ADIOS2 at it via the environment variable before opening any MS:: import os from pclean.config import get_adios2_config_path path = get_adios2_config_path() if path is not None: os.environ.setdefault('ADIOS2_INIT_FILE', str(path)) Returns: Resolved filesystem :class:`Path`, or ``None`` if not found. """ try: from importlib.resources import files candidate = Path(str(files('pclean').joinpath('configs/adios2_config.xml'))) if candidate.exists(): return candidate except Exception: # pragma: no cover pass return None