Source code for simind_python_connector.utils.interfile_numpy

"""
Utilities for loading Interfile projection data directly into NumPy.
"""

from __future__ import annotations

import re
from dataclasses import dataclass
from pathlib import Path
from typing import Mapping, Optional, Sequence, Union

import numpy as np

from .interfile_parser import parse_interfile_header


MatrixShape = Sequence[int]
HeaderInput = Union[str, Path]

_MATRIX_SIZE_PATTERN = re.compile(r"!?matrix size\s*\[(\d+)\]", re.IGNORECASE)


[docs] @dataclass(frozen=True) class InterfileArray: """NumPy payload and file references parsed from an Interfile header.""" array: np.ndarray header_path: Path data_path: Path metadata: dict[str, str]
def _lookup_header_value(header: Mapping[str, str], *keys: str) -> Optional[str]: """Return the first matching value for a header key (case-insensitive).""" lower_map = {key.lower(): value for key, value in header.items()} for key in keys: value = lower_map.get(key.lower()) if value is not None: return value return None def _extract_matrix_shape(header: Mapping[str, str]) -> tuple[int, ...]: """Extract matrix sizes and return NumPy shape in memory order.""" sizes_by_index: dict[int, int] = {} for key, raw_value in header.items(): match = _MATRIX_SIZE_PATTERN.fullmatch(key.strip()) if not match: continue axis = int(match.group(1)) sizes_by_index[axis] = int(raw_value) if not sizes_by_index: raise ValueError("No 'matrix size [i]' entries found in Interfile header") ordered_axes = [sizes_by_index[idx] for idx in sorted(sizes_by_index)] if any(size <= 0 for size in ordered_axes): raise ValueError(f"Invalid matrix sizes in header: {ordered_axes}") # Interfile stores axis [1] as the fastest changing index. return tuple(reversed(ordered_axes)) def _extract_numpy_dtype(header: Mapping[str, str]) -> np.dtype: """Infer the binary payload dtype from Interfile metadata.""" number_format = ( _lookup_header_value(header, "!number format", "number format") or "float" ).strip() bytes_per_pixel_raw = _lookup_header_value( header, "!number of bytes per pixel", "number of bytes per pixel" ) bytes_per_pixel = int(float(bytes_per_pixel_raw)) if bytes_per_pixel_raw else 4 number_format_lc = number_format.lower() if "float" in number_format_lc: kind = "f" elif "unsigned" in number_format_lc: kind = "u" elif "signed" in number_format_lc or "integer" in number_format_lc: kind = "i" else: raise ValueError(f"Unsupported Interfile number format: {number_format!r}") dtype = np.dtype(f"{kind}{bytes_per_pixel}") byte_order = ( _lookup_header_value(header, "imagedata byte order", "!imagedata byte order") or "" ).lower() if "big" in byte_order: return dtype.newbyteorder(">") if "little" in byte_order: return dtype.newbyteorder("<") return dtype.newbyteorder("=") def _extract_projection_count(header: Mapping[str, str]) -> Optional[int]: """Extract projection/image count used for 3D SPECT sinograms.""" raw = _lookup_header_value( header, "!number of projections", "number of projections", "!total number of images", "total number of images", "!number of images/energy window", "number of images/energy window", ) if raw is None: return None try: value = int(float(raw)) except (TypeError, ValueError): return None return value if value > 0 else None def _infer_leading_axis_count( metadata: Mapping[str, str], plane_elements: int, payload_elements: int ) -> Optional[int]: """Infer leading stack axis for headers that only declare 2D planes.""" if plane_elements <= 0: return None # Prefer explicit projection/image-count fields from the header. projection_count = _extract_projection_count(metadata) if projection_count is not None: return projection_count # Otherwise infer directly from payload length when possible. if payload_elements % plane_elements == 0: inferred = payload_elements // plane_elements if inferred > 0: return inferred # Fall back to a single-frame stack rather than treating projections as 2D. return 1
[docs] def load_interfile_array(header_path: HeaderInput) -> InterfileArray: """Load projection data referenced by an Interfile header into NumPy.""" header_path = Path(header_path).expanduser().resolve() metadata = parse_interfile_header(str(header_path)) data_filename = _lookup_header_value( metadata, "!name of data file", "name of data file" ) if not data_filename: raise ValueError( f"Interfile header {header_path} does not define 'name of data file'" ) data_path = (header_path.parent / data_filename).resolve() if not data_path.exists(): raise FileNotFoundError( f"Interfile data file referenced by header does not exist: {data_path}" ) dtype = _extract_numpy_dtype(metadata) shape = _extract_matrix_shape(metadata) expected_elements = int(np.prod(shape)) flat = np.fromfile(data_path, dtype=dtype) # Projection and tomographic payloads are treated as at least 3D: # [leading_axis, axis2, axis1]. If headers only declare matrix sizes [1],[2], # recover the leading axis from projection count fields or payload length. if len(shape) == 2: leading_count = _infer_leading_axis_count( metadata, expected_elements, flat.size ) if leading_count is not None: shape = (leading_count, *shape) expected_elements = int(np.prod(shape)) if flat.size < expected_elements: raise ValueError( f"Data size mismatch for {data_path}: expected {expected_elements} " f"elements for shape {shape}, found {flat.size}" ) # SIMIND files can contain trailing payload not represented by Interfile keys. # Use the declared geometry and discard trailing elements. if flat.size > expected_elements: flat = flat[:expected_elements] return InterfileArray( array=flat.reshape(shape), header_path=header_path, data_path=data_path, metadata=dict(metadata), )
__all__ = ["InterfileArray", "load_interfile_array"]