"""Pure-numpy auto-multithresh masking.
Re-implements CASA's ``SDMaskHandler::autoMaskByMultiThreshold`` algorithm
using numpy/scipy, eliminating:
* Repeated full-cube copy to TempImage
* Per-plane TempImage allocations (~8 per chan per iteration)
* casacore TableCache interactions (mask0 subtable bug)
* Multiple statistics passes
The algorithm faithfully reproduces the six stages of the C++ implementation:
threshold → prune → smooth → grow → combine → negative mask.
Memory optimisation notes:
All intermediate masks use ``np.bool_`` (1 byte/pixel) instead of
``float32`` (4 bytes/pixel), giving a **4× footprint reduction** on
mask arrays. Only the final mask returned to the caller is float32
(CASA image format). ``AutoMaskState.posmask`` and ``.prevmask``
are stored as ``bool_`` between iterations. When ``pb`` is *None*
no PB-mask array is allocated at all (``pb_mask = None`` sentinel).
References:
SDMaskHandler.cc ``autoMaskByMultiThreshold()`` (CASA 6.x)
Kepley et al. 2020, PASP 132, 024505
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import TypeAlias
import numpy as np
from numpy.typing import NDArray
from scipy.ndimage import (
binary_dilation,
gaussian_filter,
label,
sum as ndi_sum,
)
log = logging.getLogger(__name__)
# Type alias for a 2-D float32 image plane.
Plane: TypeAlias = NDArray[np.float32]
# ======================================================================
# Configuration
# ======================================================================
[docs]
@dataclass
class AutoMaskConfig:
"""Parameters mirroring CASA's auto-multithresh knobs.
Defaults match CASA's ``tclean`` defaults.
"""
sidelobethreshold: float = 3.0
noisethreshold: float = 5.0
lownoisethreshold: float = 1.5
negativethreshold: float = 0.0
smoothfactor: float = 1.0
minbeamfrac: float = 0.3
cutthreshold: float = 0.01
growiterations: int = 100
dogrowprune: bool = True
minpercentchange: float = 0.0
fastnoise: bool = True
[docs]
@classmethod
def from_pclean_config(cls, dec) -> AutoMaskConfig:
"""Build from a ``DeconvolutionConfig`` object."""
return cls(
sidelobethreshold=dec.sidelobethreshold,
noisethreshold=dec.noisethreshold,
lownoisethreshold=dec.lownoisethreshold,
negativethreshold=dec.negativethreshold,
smoothfactor=dec.smoothfactor,
minbeamfrac=dec.minbeamfrac,
cutthreshold=dec.cutthreshold,
growiterations=dec.growiterations,
dogrowprune=dec.dogrowprune,
minpercentchange=dec.minpercentchange,
fastnoise=dec.fastnoise,
)
# ======================================================================
# Per-channel state
# ======================================================================
[docs]
@dataclass
class AutoMaskState:
"""Mutable state carried across major-cycle iterations.
The C++ implementation keeps ``posmask`` and ``prevmask`` inside
``SIImageStore``. Here we keep lightweight numpy arrays per
channel so the Python caller can stash them between cycles.
"""
posmask: NDArray[np.bool_] | None = None # accumulated positive mask
prevmask: NDArray[np.bool_] | None = None # mask from previous iteration
iteration: int = 0 # how many times automask has been called
skip: bool = False # channel flagged as stable
# ======================================================================
# Statistics helpers
# ======================================================================
def _robust_rms(data: Plane) -> float:
"""MAD-based robust RMS estimate (σ ≈ 1.4826 × MAD)."""
med = np.median(data)
mad = np.median(np.abs(data - med))
return float(mad * 1.4826)
def _plane_stats(
residual: Plane,
*,
fastnoise: bool = True,
prev_mask: NDArray[np.bool_] | None = None,
) -> tuple[float, float, float, float]:
"""Compute (absmax, median, robust_rms, classic_rms) for one plane.
Both ``fastnoise`` paths use the MAD-based robust RMS estimator
(σ ≈ 1.4826 × MAD), matching CASA C++ ``SDMaskHandler`` which
always uses ``medabsdevmed * 1.4826`` from ``ImageStatsCalculator``
with ``robust=true``.
With ``fastnoise=True`` the MAD is computed on **all** pixels
(source emission included); MAD is inherently robust to outliers.
With ``fastnoise=False`` pixels inside *prev_mask* are excluded
before computing the MAD, giving a source-free noise estimate.
"""
absmax = float(np.max(np.abs(residual)))
median = float(np.median(residual))
if fastnoise:
rms = _robust_rms(residual)
else:
if prev_mask is not None and np.any(prev_mask > 0.5):
source_free = residual[prev_mask < 0.5]
if source_free.size > 0:
rms = _robust_rms(source_free)
else:
rms = _robust_rms(residual)
else:
rms = _robust_rms(residual)
return absmax, median, rms, rms
# ======================================================================
# Core algorithm stages
# ======================================================================
def _prune_regions(
mask: NDArray,
min_size: float,
) -> NDArray[np.bool_]:
"""Remove connected regions smaller than *min_size* pixels.
Equivalent to ``SDMaskHandler::YAPruneRegions``.
Returns:
Boolean mask with small regions removed.
"""
if min_size <= 0:
return np.asarray(mask, dtype=np.bool_)
bool_mask = np.asarray(mask > 0 if mask.dtype != np.bool_ else mask,
dtype=np.bool_)
labeled, n_features = label(bool_mask)
if n_features == 0:
return bool_mask
component_sizes = ndi_sum(bool_mask, labeled, range(1, n_features + 1))
# Build a look-up table: label → keep?
keep = np.zeros(n_features + 1, dtype=np.bool_)
n_kept = 0
for i, size in enumerate(component_sizes, start=1):
if size >= min_size:
keep[i] = True
n_kept += 1
log.debug('prune: %d / %d regions kept (min_size=%.1f pix)',
n_kept, n_features, min_size)
return keep[labeled]
def _beam_sigma_to_axis(
sigma_major: float,
sigma_minor: float,
pa_rad: float = 0.0,
) -> tuple[float, float]:
"""Project beam (major, minor, PA) onto per-axis Gaussian sigmas.
PA follows the CASA/IAU convention (measured East from North, i.e.
counter-clockwise from the Dec axis). ``getchunk()`` returns data
with axis 0 = RA and axis 1 = Dec, so:
* PA = 0 → major along Dec (axis 1), minor along RA (axis 0)
* PA = 90° → major along RA (axis 0), minor along Dec (axis 1)
The projection uses the RMS width of the rotated ellipse along
each image axis:
σ_axis0² = σ_major² sin²(PA) + σ_minor² cos²(PA)
σ_axis1² = σ_major² cos²(PA) + σ_minor² sin²(PA)
Returns:
``(sigma_axis0, sigma_axis1)``
"""
c = np.cos(pa_rad)
s = np.sin(pa_rad)
s2_maj = sigma_major ** 2
s2_min = sigma_minor ** 2
sigma_ax0 = np.sqrt(s2_maj * s * s + s2_min * c * c)
sigma_ax1 = np.sqrt(s2_maj * c * c + s2_min * s * s)
return float(sigma_ax0), float(sigma_ax1)
def _make_gaussian_psf(
shape: tuple[int, int],
sigma_major: float,
sigma_minor: float,
pa_rad: float = 0.0,
) -> NDArray[np.float32]:
"""Build a peak-normalised 2-D rotated Gaussian PSF model.
This mirrors the C++ ``MakeGaussianPSF`` used by
``SIImageStore::getPSFSidelobeLevel()`` to compute the delobed
sidelobe level.
The Gaussian is centred at ``(shape[0]//2, shape[1]//2)`` using the
CASA axis convention (axis 0 = RA, axis 1 = Dec, PA from North
toward East).
Memory-efficient: builds coordinate grids in-place and avoids
intermediate full-size arrays.
Returns:
float32 array of *shape* with peak = 1.0.
"""
cy, cx = shape[0] // 2, shape[1] // 2
# Rotation: PA is from Dec (axis 1) toward RA (axis 0), CCW.
cos_pa = np.cos(pa_rad)
sin_pa = np.sin(pa_rad)
# Row (axis-0) and column (axis-1) offsets
rows = np.arange(shape[0], dtype=np.float32) - cy
cols = np.arange(shape[1], dtype=np.float32) - cx
# Rotated coordinates — broadcast without full meshgrid
# u = d0 * sin(PA) + d1 * cos(PA) → along major axis
# v = -d0 * cos(PA) + d1 * sin(PA) → along minor axis
# Compute (u/σ_major)² + (v/σ_minor)² in-place.
inv_s2_maj = 1.0 / (2.0 * sigma_major ** 2) if sigma_major > 0 else 0.0
inv_s2_min = 1.0 / (2.0 * sigma_minor ** 2) if sigma_minor > 0 else 0.0
# Pre-compute row contribution terms (shape[0],)
r_sin = rows * np.float32(sin_pa)
r_cos = rows * np.float32(cos_pa)
out = np.empty(shape, dtype=np.float32)
for j, cj in enumerate(cols):
u = r_sin + np.float32(cj * cos_pa) # (shape[0],)
v = np.float32(cj * sin_pa) - r_cos # (shape[0],)
np.multiply(u, u, out=u)
u *= np.float32(inv_s2_maj)
np.multiply(v, v, out=v)
v *= np.float32(inv_s2_min)
u += v
np.negative(u, out=u)
np.exp(u, out=out[:, j])
peak = float(out[cy, cx])
if peak > 0:
out /= np.float32(peak)
return out
def _smooth_and_cut(
mask: NDArray,
beam_sigma_pix: tuple[float, float],
smooth_factor: float,
cut_threshold: float,
beam_pa_rad: float = 0.0,
) -> NDArray[np.bool_]:
"""Gaussian-smooth the mask and binarise at *cut_threshold × max*.
Equivalent to ``SDMaskHandler::convolveMask`` + ``makeMaskByPerChanThreshold``
on the smoothed result.
When *beam_pa_rad* is non-zero the smoothing kernel sigmas are
projected onto the image axes via :func:`_beam_sigma_to_axis`,
correctly handling the beam position angle.
Returns:
Boolean mask.
"""
# Fast path: entirely empty mask
if not np.any(mask):
return np.zeros(mask.shape, dtype=np.bool_)
# Project beam sigmas onto image axes (handles PA)
s0, s1 = _beam_sigma_to_axis(
beam_sigma_pix[0], beam_sigma_pix[1], beam_pa_rad,
)
sigma = (s0 * smooth_factor, s1 * smooth_factor)
smoothed = gaussian_filter(
mask.astype(np.float32) if mask.dtype != np.float32 else mask,
sigma=sigma,
)
peak = float(smoothed.max())
if peak <= 0:
return np.zeros(mask.shape, dtype=np.bool_)
cut = cut_threshold * peak
result = smoothed >= cut
del smoothed
return result
def _grow_mask(
prev_mask: NDArray,
constraint: NDArray,
iterations: int,
) -> NDArray[np.bool_]:
"""Binary-dilate *prev_mask* within *constraint* region.
Uses a 3×3 cross structuring element (4-connected), matching
``SDMaskHandler::binaryDilation``.
Returns:
Boolean mask.
"""
if iterations <= 0 or not np.any(prev_mask):
return np.asarray(prev_mask > 0 if prev_mask.dtype != np.bool_
else prev_mask, dtype=np.bool_)
cross = np.array([[0, 1, 0],
[1, 1, 1],
[0, 1, 0]], dtype=bool)
prev_bool = prev_mask if prev_mask.dtype == np.bool_ else prev_mask > 0
con_bool = constraint if constraint.dtype == np.bool_ else constraint > 0
return binary_dilation(
prev_bool,
structure=cross,
iterations=iterations,
mask=con_bool,
)
# ======================================================================
# Main entry point
# ======================================================================
[docs]
def automask_plane(
residual: Plane,
sidelobe_level: float,
beam_area_pix: float,
beam_sigma_pix: tuple[float, float],
cfg: AutoMaskConfig,
state: AutoMaskState,
pb: Plane | None = None,
pblimit: float = 0.2,
beam_pa_rad: float = 0.0,
) -> Plane:
"""Compute the auto-multithresh mask for a single 2-D plane.
This is the numpy equivalent of
``SDMaskHandler::autoMaskByMultiThreshold`` operating on one
(pol, chan) slice.
Memory strategy:
* All intermediate masks are ``np.bool_`` (1 byte/pixel).
* ``pb_mask`` is ``None`` when no PB is provided (zero alloc).
* ``state.posmask`` / ``state.prevmask`` stored as ``bool_``.
* In-place ``|=`` replaces ``np.maximum`` chains.
* Only the final return value is cast to ``float32`` for CASA I/O.
Args:
residual: 2-D residual image (float32).
sidelobe_level: PSF sidelobe level (scalar, from ``getPSFSidelobeLevel``).
beam_area_pix: Restoring beam area in pixels.
beam_sigma_pix: ``(sigma_major, sigma_minor)`` of the restoring beam
in pixels (for Gaussian smoothing).
cfg: Automasking parameters.
state: Mutable per-channel state (updated in-place).
pb: Primary beam image (same shape as *residual*). If given,
regions with ``pb < pblimit`` are excluded.
pblimit: PB cutoff for the mask region.
beam_pa_rad: Beam position angle in radians (East from North).
Returns:
The updated binary mask (float32, 0/1).
"""
# ---- PB-mask --------------------------------------------------------
# When no PB is supplied, pb_mask stays None — zero allocation.
pb_mask: NDArray[np.bool_] | None = None
if pb is not None:
pb_mask = pb >= pblimit
# ---- Stage 0: statistics & thresholds --------------------------------
if pb_mask is not None:
stats_data = residual[pb_mask]
stats_prev = (state.prevmask[pb_mask]
if state.prevmask is not None else None)
else:
stats_data = residual
stats_prev = state.prevmask
absmax, median, rms, _ = _plane_stats(
stats_data,
fastnoise=cfg.fastnoise,
prev_mask=stats_prev,
)
del stats_data, stats_prev
# Nothing to mask if the residual is effectively zero
if absmax == 0:
log.info('automask iter %d: residual is zero — returning empty mask',
state.iteration)
state.prevmask = np.zeros(residual.shape, dtype=np.bool_)
state.iteration += 1
return np.zeros(residual.shape, dtype=np.float32)
sidelobe_thresh = sidelobe_level * cfg.sidelobethreshold * absmax + median
noise_thresh = cfg.noisethreshold * rms + median
low_noise_thresh = cfg.lownoisethreshold * rms + median
mask_threshold = max(sidelobe_thresh, noise_thresh)
threshold_source = ('sidelobe' if sidelobe_thresh >= noise_thresh
else 'noise')
log.info(
'automask iter %d: absmax=%.4g median=%.4g rms=%.4g '
'sidelobe_thresh=%.4g noise_thresh=%.4g '
'mask_thresh=%.4g (%s-limited)',
state.iteration, absmax, median, rms,
sidelobe_thresh, noise_thresh, mask_threshold, threshold_source,
)
# ---- Stage 1: threshold mask + prune ---------------------------------
# Apply PB mask inline: only threshold where PB is valid
threshold_mask: NDArray[np.bool_]
if pb_mask is not None:
threshold_mask = pb_mask & (residual >= mask_threshold)
else:
threshold_mask = residual >= mask_threshold
n_thresh_raw = int(np.count_nonzero(threshold_mask))
prune_size = cfg.minbeamfrac * beam_area_pix
if prune_size > 0:
threshold_mask = _prune_regions(threshold_mask, prune_size)
n_thresh_pruned = int(np.count_nonzero(threshold_mask))
log.info('automask iter %d: threshold → %d pix, after prune → %d pix '
'(prune_size=%.1f)',
state.iteration, n_thresh_raw, n_thresh_pruned, prune_size)
# ---- Stage 2: smooth + binarise (returns bool) -----------------------
smoothed_mask = _smooth_and_cut(
threshold_mask, beam_sigma_pix, cfg.smoothfactor, cfg.cutthreshold,
beam_pa_rad=beam_pa_rad,
)
del threshold_mask
log.info('automask iter %d: smooth+cut → %d pix',
state.iteration, int(np.count_nonzero(smoothed_mask)))
# ---- Stage 3: grow (binary dilation) — skip on first iteration -------
grown_mask: NDArray[np.bool_] | None = None
if state.iteration > 0 and state.prevmask is not None and cfg.growiterations > 0:
# Constraint: residual above low-noise threshold (bool)
low_mask_thresh = max(sidelobe_thresh, low_noise_thresh)
if pb_mask is not None:
constraint = pb_mask & (residual >= low_mask_thresh)
else:
constraint = residual >= low_mask_thresh
n_constraint = int(np.count_nonzero(constraint))
grown_mask = _grow_mask(state.prevmask, constraint, cfg.growiterations)
del constraint
n_grown_raw = int(np.count_nonzero(grown_mask))
# Prune the grown mask
if cfg.dogrowprune and prune_size > 0:
grown_mask = _prune_regions(grown_mask, prune_size)
# Smooth the grown mask
grown_mask = _smooth_and_cut(
grown_mask, beam_sigma_pix, cfg.smoothfactor, cfg.cutthreshold,
beam_pa_rad=beam_pa_rad,
)
log.info('automask iter %d: grow (%d iters, %d constraint pix) → '
'%d pix raw, %d pix after prune+smooth',
state.iteration, cfg.growiterations, n_constraint,
n_grown_raw, int(np.count_nonzero(grown_mask)))
else:
log.info('automask iter %d: grow skipped (first iteration)',
state.iteration)
# ---- Stage 4: combine (in-place bool OR) -----------------------------
if state.posmask is None:
posmask = smoothed_mask.copy()
else:
posmask = state.posmask | smoothed_mask
del smoothed_mask
if grown_mask is not None:
posmask |= grown_mask
del grown_mask
# Apply PB constraint
if pb_mask is not None:
posmask &= pb_mask
state.posmask = posmask # stored as bool
# ---- Stage 5: negative mask (optional) --------------------------------
neg_mask: NDArray[np.bool_] | None = None
if cfg.negativethreshold > 0:
neg_thresh_val = cfg.negativethreshold * rms
sidelobe_thresh_no_offset = sidelobe_level * cfg.sidelobethreshold * absmax
neg_mask_threshold = -(max(sidelobe_thresh_no_offset, neg_thresh_val)) + median
if pb_mask is not None:
neg_threshold_mask = pb_mask & (residual <= neg_mask_threshold)
else:
neg_threshold_mask = residual <= neg_mask_threshold
neg_mask = _smooth_and_cut(
neg_threshold_mask, beam_sigma_pix, cfg.smoothfactor,
cfg.cutthreshold, beam_pa_rad=beam_pa_rad,
)
del neg_threshold_mask
log.info('automask iter %d: negative mask → %d pix '
'(neg_thresh=%.4g)',
state.iteration, int(np.count_nonzero(neg_mask)),
neg_mask_threshold)
# ---- Stage 6: final combination (in-place bool OR) -------------------
if state.prevmask is not None:
final_mask = state.prevmask | posmask
else:
final_mask = posmask.copy()
if neg_mask is not None:
final_mask |= neg_mask
del neg_mask
# Enforce PB region
if pb_mask is not None:
final_mask &= pb_mask
# ---- Update state ----------------------------------------------------
# Check percent change for early-skip optimisation
change_pct = 0.0
if state.prevmask is not None:
n_changed = int(np.count_nonzero(final_mask != state.prevmask))
total_pix = (int(np.count_nonzero(pb_mask))
if pb_mask is not None else residual.size)
if total_pix > 0:
change_pct = 100.0 * n_changed / total_pix
if cfg.minpercentchange > 0 and change_pct < cfg.minpercentchange:
state.skip = True
log.info('automask iter %d: mask change %.2f%% < '
'minpercentchange %.2f%% — flagging skip',
state.iteration, change_pct, cfg.minpercentchange)
state.prevmask = final_mask # stored as bool (no copy needed — we own it)
state.iteration += 1
npix = int(np.count_nonzero(final_mask))
total = (int(np.count_nonzero(pb_mask))
if pb_mask is not None else residual.size)
pct = 100.0 * npix / total if total > 0 else 0.0
log.info('automask iter %d: final mask %d / %d pix (%.2f%%), '
'change from prev %.2f%%',
state.iteration - 1, npix, total, pct, change_pct)
return final_mask.astype(np.float32)
# ======================================================================
# Helper: extract beam info from CASA image
# ======================================================================
[docs]
def beam_info_from_image(
imagename: str,
) -> tuple[float, float, tuple[float, float], float]:
"""Read beam area, sidelobe level, sigma, and PA from a CASA image.
Returns:
``(beam_area_pix, sidelobe_level, (sigma_major_pix, sigma_minor_pix), pa_rad)``
"""
import casatools as ct
ia = ct.image()
try:
ia.open(imagename + '.psf')
beam = ia.restoringbeam()
cs = ia.coordsys()
incr = cs.increment(type='direction', format='n')['numeric']
cell_rad = abs(incr[0]) # radians per pixel
cs.done()
# Beam FWHM in pixels
major_pix = beam['major']['value']
minor_pix = beam['minor']['value']
# Convert from beam units to pixel units
if beam['major']['unit'] == 'arcsec':
major_pix /= (cell_rad * 206264.80625)
minor_pix /= (cell_rad * 206264.80625)
elif beam['major']['unit'] == 'rad':
major_pix /= cell_rad
minor_pix /= cell_rad
# Position angle
pa_deg = beam.get('positionangle', {}).get('value', 0.0)
pa_unit = beam.get('positionangle', {}).get('unit', 'deg')
pa_rad = float(np.deg2rad(pa_deg) if pa_unit == 'deg' else pa_deg)
# FWHM → sigma: sigma = FWHM / (2 * sqrt(2 * ln(2)))
fwhm_to_sigma = 1.0 / (2.0 * np.sqrt(2.0 * np.log(2.0)))
sigma_major = major_pix * fwhm_to_sigma
sigma_minor = minor_pix * fwhm_to_sigma
# Beam area in pixels = pi * major * minor / (4 * ln(2))
beam_area_pix = float(
np.pi * major_pix * minor_pix / (4.0 * np.log(2.0))
)
ia.close()
# Sidelobe level — matches SIImageStore::getPSFSidelobeLevel().
#
# Build an analytic rotated-Gaussian PSF model (with PA), then
# delobed = PSF − Gaussian_model
# sidelobe = max(|min(PSF)|, max(delobed))
ia.open(imagename + '.psf')
psf_data = ia.getchunk()
ia.close()
psf_plane = psf_data[:, :, 0, 0].astype(np.float32)
del psf_data
peak = float(psf_plane.max())
if peak > 0:
psf_plane /= np.float32(peak)
# Record |min(PSF)| before subtracting the model in-place.
psf_min_abs = abs(float(np.min(psf_plane)))
gaussian_model = _make_gaussian_psf(
psf_plane.shape, sigma_major, sigma_minor, pa_rad,
)
psf_plane -= gaussian_model # delobed in-place
del gaussian_model
sidelobe_level = float(max(
psf_min_abs, float(np.max(psf_plane)),
))
del psf_plane
return beam_area_pix, sidelobe_level, (sigma_major, sigma_minor), pa_rad
finally:
ia.done()
[docs]
def read_plane(imagename: str) -> Plane:
"""Read a single-channel CASA image into a 2-D numpy array."""
import casatools as ct
ia = ct.image()
try:
ia.open(imagename)
data = ia.getchunk()
ia.close()
# Shape is (nx, ny, npol, nchan) — squeeze to 2D
return data[:, :, 0, 0].astype(np.float32)
finally:
ia.done()
[docs]
def write_plane(imagename: str, data: Plane) -> None:
"""Write a 2-D numpy array to an existing single-channel CASA image."""
import casatools as ct
ia = ct.image()
try:
ia.open(imagename)
shape = ia.shape()
# Reshape back to CASA's (nx, ny, npol, nchan) convention
out = data.reshape(shape[0], shape[1], 1, 1).astype(np.float32)
ia.putchunk(out)
ia.close()
finally:
ia.done()