Source code for pclean.utils.image_concat

"""Image concatenation utilities.

After parallel cube imaging each worker produces a sub-cube.  This
module concatenates them into the final output cube, mirroring the
``ia.imageconcat()`` call used in CASA's parallel cube imager.

Three concatenation modes are supported (via ``concat_mode`` in
:class:`~pclean.config.ClusterConfig`):

* **paged** (default): Pixel data are physically copied into a new
  self-contained CASA image.  Slower but fully independent of the
  subcubes after completion.
* **virtual** (``mode='nomovevirtual'``): The output image is a lightweight
  reference catalog that points at the original subcube files.  Near-instant
  but requires the subcubes to stay on disk (``keep_subcubes=True``).
* **movevirtual** (``mode='movevirtual'``): The subcube directories are
  renamed (moved) into the output image.  Near-instant on the same
  filesystem; the subcubes are consumed in the process.

When ``concat_mode='auto'`` (the default), the mode is derived from
``keep_subcubes``: ``True`` → virtual, ``False`` → paged.

When multiple extensions need concatenating (e.g. ``.image``, ``.residual``,
``.psf``, …), a ``ProcessPoolExecutor`` (``spawn`` start method) is used for
paged mode so that each subprocess gets its own casacore ``TableCache`` and
there is no shared C++ state between workers.  Virtual modes are run
sequentially because they are near-instant and write shared catalog metadata.
"""

from __future__ import annotations

import logging
import multiprocessing
import os
import threading
import time
from concurrent.futures import ProcessPoolExecutor, as_completed

log = logging.getLogger(__name__)

# casacore maintained a process-global `TableCache` (std::map) that is
# registered/looked-up on every table open.  A mutex was present from
# 2011 (commit 68d3f2d) until April 2021, when PR #1095 removed it as
# part of stripping all casacore-internal threading infrastructure
# (issue #896).  casacore >= 3.4.0 (CASA 6.x) therefore has NO mutex
# protecting the TableCache, making concurrent imageconcat() calls from
# threads a data race that causes a segfault (observed in CI with four
# threads in imageconcat() simultaneously on paged mode).
#
# ia.imageconcat() is a monolithic SWIG-wrapped C++ call with no
# internal thread guards: the table-cache registration and all pixel I/O
# are interleaved and cannot be split at the Python level.  We therefore
# serialise ALL modes through _tablelock, not just virtual modes.
_tablelock = threading.Lock()

_casatools = None


def _ct():
    global _casatools
    if _casatools is None:
        import casatools as ct

        _casatools = ct
    return _casatools


# ======================================================================
# Public
# ======================================================================


[docs] def concat_images( outimage: str, inimages: list[str], axis: int = -1, relax: bool = True, overwrite: bool = True, mode: str = 'paged', ) -> None: """Concatenate a list of CASA images along *axis*. Args: outimage: Path for the output concatenated image. inimages: Ordered list of input sub-images. axis: Axis to concatenate along (default -1 -> spectral). relax: Relax axis checks. overwrite: Overwrite *outimage* if it exists. mode: CASA imageconcat mode. ``'paged'`` (default) physically copies data. ``'nomovevirtual'`` creates a reference catalog (near-instant, but requires input images to stay on disk). ``'movevirtual'`` creates a virtual concatenation by moving subcube directories into the output image. """ ct = _ct() ia = ct.image() t0 = time.monotonic() try: # ia.imageconcat() is a monolithic SWIG-wrapped C++ call: the # casacore TableCache registration and all pixel I/O happen inside # a single function with no thread-safety guarantees. Concurrent # calls from the ThreadPoolExecutor cause a segfault (observed in # CI: four threads all in imageconcat simultaneously). Serialise # with _tablelock for all modes. with _tablelock: ia.imageconcat( outfile=outimage, infiles=inimages, axis=axis, relax=relax, overwrite=overwrite, tempclose=False, reorder=False, mode=mode, ) finally: ia.done() elapsed = time.monotonic() - t0 log.info( 'Concatenated %d images → %s (mode=%s, %.1fs)', len(inimages), outimage, mode, elapsed, )
# Module-level worker function: must live at module scope to be picklable # by ProcessPoolExecutor when using the 'spawn' start method. def _concat_images_worker(args: tuple) -> str: """Subprocess entry-point for parallel extension concatenation.""" outimage, inimages, mode = args concat_images(outimage, inimages, mode=mode) return outimage
[docs] def concat_subcubes( base_imagename: str, nparts: int, extensions: list[str] | None = None, mode: str = 'paged', max_workers: int = 4, # Deprecated — use *mode* instead. virtual: bool | None = None, # Private seam for tests: inject ThreadPoolExecutor to avoid subprocess # spawning while keeping mock-based assertions intact. _pool_cls=None, ) -> None: """Concatenate all standard image products from numbered sub-cubes. Products include ``.image``, ``.residual``, ``.psf``, etc. The *mode* parameter is forwarded directly to ``ia.imageconcat()``: * ``'paged'`` — pixel data are physically copied (default, always safe). Extensions are concatenated **in parallel** via ``ProcessPoolExecutor`` (``spawn`` context) so each subprocess owns an independent casacore ``TableCache`` — true I/O parallelism, no shared C++ state. * ``'nomovevirtual'`` — lightweight reference catalog, near-instant but subcube files **must remain on disk**. Run **sequentially** because virtual-catalog metadata is shared across calls. * ``'movevirtual'`` — renames subcubes into the output directory (near-instant, subcubes are consumed). Also run sequentially. .. deprecated:: The *virtual* parameter is deprecated. Pass *mode* explicitly. Args: base_imagename: The original ``imagename`` (without ``.subcube.N``). nparts: Number of sub-cubes. extensions: Image extensions to concatenate. Defaults to a standard set. mode: CASA ``imageconcat`` mode string. max_workers: Maximum parallel concatenation workers (paged mode only). virtual: **Deprecated.** ``True`` maps to ``mode='nomovevirtual'``. """ # Backward compat: honour deprecated *virtual* flag if *mode* not overridden. if virtual is not None: import warnings warnings.warn( "concat_subcubes(virtual=...) is deprecated; use mode='nomovevirtual' " "or mode='paged' instead.", DeprecationWarning, stacklevel=2, ) if mode == 'paged': # only override when caller did not set mode mode = 'nomovevirtual' if virtual else 'paged' if extensions is None: extensions = [ '.image', '.residual', '.psf', '.model', '.pb', '.image.pbcor', '.mask', '.weight', '.sumwt', ] log.info( 'Concatenating %d extensions (nparts=%d, mode=%s, max_workers=%d)', len(extensions), nparts, mode, max_workers, ) t0 = time.monotonic() # Collect work items: (extension, list-of-input-files, output-file) work: list[tuple[str, list[str], str]] = [] for ext in extensions: infiles = [] for i in range(nparts): subname = f'{base_imagename}.subcube.{i}{ext}' if os.path.isdir(subname) or os.path.isfile(subname): infiles.append(subname) if not infiles: continue outfile = f'{base_imagename}{ext}' work.append((ext, infiles, outfile)) if not work: log.warning('No subcube files found for concatenation') return failed: list[tuple[str, Exception]] = [] worker_count = len(work) effective_workers = worker_count if max_workers <= 0 else min(max_workers, worker_count) _virtual = mode in ('nomovevirtual', 'movevirtual') if _virtual: # Virtual modes write shared casacore catalog metadata and are # near-instant — run sequentially to avoid catalog corruption. for ext, infiles, outfile in work: try: concat_images(outfile, infiles, mode=mode) except Exception as exc: # pragma: no cover log.warning('Failed to concatenate %s: %s', ext, exc) failed.append((ext, exc)) else: # Paged mode: spawn independent subprocesses so each gets its own # casacore TableCache — true parallelism with no shared C++ state. # ProcessPoolExecutor reuses workers across tasks so casatools is # imported once per worker, not once per extension. # # _pool_cls is a private seam for tests: pass ThreadPoolExecutor to # keep mock-based tests working without spawning real subprocesses. if _pool_cls is None: pool_factory = ProcessPoolExecutor pool_kwargs: dict = { 'max_workers': effective_workers, 'mp_context': multiprocessing.get_context('spawn'), } else: pool_factory = _pool_cls pool_kwargs = {'max_workers': effective_workers} with pool_factory(**pool_kwargs) as pool: future_to_ext = { pool.submit(_concat_images_worker, (outfile, infiles, mode)): ext for ext, infiles, outfile in work } for future in as_completed(future_to_ext): ext = future_to_ext[future] try: future.result() except Exception as exc: log.warning('Failed to concatenate %s: %s', ext, exc) failed.append((ext, exc)) elapsed = time.monotonic() - t0 ok_count = len(work) - len(failed) log.info( 'Concatenation complete: %d/%d extensions in %.1fs', ok_count, len(work), elapsed, ) if failed: log.warning( 'Failed extensions: %s', ', '.join(ext for ext, _ in failed), )