Source code for simind_python_connector.utils.stir_utils

# This file contains a few useful functions for converting SIMIND output to
# STIR format.
### It should probably be cleaned up and object-orientedified at some point.

### Author: Sam Porter, Efstathios Varzakis

import contextlib
import os
import re
import tempfile
from typing import Optional

import numpy as np

from simind_python_connector.utils.backend_access import BACKENDS

from . import get_array
from .import_helpers import get_sirf_types
from .interfile_parser import parse_interfile_header, parse_interfile_line


# Conditional import for SIRF to avoid CI dependencies
ImageData, AcquisitionData, SIRF_AVAILABLE = get_sirf_types()

# Unpack interfaces needed by stir_utils
create_acquisition_data = BACKENDS.factories.create_acquisition_data
create_image_data = BACKENDS.factories.create_image_data


def _parse_interfile_text(text: str):
    """Parse interfile-formatted text and return a dictionary."""
    values = {}
    for line in text.splitlines():
        key, value = parse_interfile_line(line)
        if key is not None:
            values[key] = value
    return values


[docs] def parse_sinogram(template_sinogram): """Parse sinogram metadata without colliding temp files. Accepts either a path to an interfile header or an acquisition object that exposes ``get_info`` or ``write``. The parser reuses the shared ``parse_interfile_header`` helper for consistent behaviour. """ if isinstance(template_sinogram, (str, os.PathLike)): return parse_interfile_header(str(template_sinogram)) if hasattr(template_sinogram, "get_info") and callable( template_sinogram.get_info # type: ignore[attr-defined] ): return _parse_interfile_text(template_sinogram.get_info()) # type: ignore[attr-defined] if hasattr(template_sinogram, "filename"): return parse_interfile_header(template_sinogram.filename) if hasattr(template_sinogram, "write") and callable(template_sinogram.write): with tempfile.NamedTemporaryFile(suffix=".hs", delete=False) as tmp: tmp_path = tmp.name try: template_sinogram.write(tmp_path) return parse_interfile_header(tmp_path) finally: with contextlib.suppress(FileNotFoundError): os.remove(tmp_path) raise TypeError( "template_sinogram must be a path or acquisition object with " "get_info()/write() methods" )
[docs] def parse_interfile(filename): """Parse STIR interfile and return dictionary of key-value pairs. This function is kept for backward compatibility. New code should use parse_interfile_header() from interfile_parser module. Args: filename: Path to interfile file Returns: Dictionary of key-value pairs """ return parse_interfile_header(filename)
[docs] def get_sirf_attenuation_from_simind( attn_filename, photopeak_energy=0.12, attn_type="mu" ): """Reads attenuation data from simind attenuation file and returns SIRF ImageData object Args: attn_filename (string): file name of simind attenuation file header new_filename (string): file name of new attenuation file header attn_type (str, optional): unit of attenuation binary file. Defaults to 'mu'. Returns: image: SIRF ImageData object containing attenuation data """ if attn_type == "mu": data_type = np.float32 elif attn_type == "rho*1000": data_type = np.uint16 # remove suffix from filename if present if attn_filename[-3:] in ["ict", "hct"]: attn_filename = attn_filename[:-4] attn = np.fromfile(f"{attn_filename}.ict", dtype=data_type) image = ImageData() header_dict = parse_interfile(f"{attn_filename}.hct") dim = [int(header_dict["!matrix size [%d]" % i]) for i in range(1, 4)][::-1] vsize = [ float(header_dict["scaling factor (mm/pixel) [%d]" % i]) for i in range(1, 3) ] vsize.append(header_dict["# scaling factor (mm/pixel) [3]"]) # origin looks like '-128.0000 -128.0000 128.000000' origin_string = header_dict["# Image Position First image"] origin = [float(i) for i in origin_string.split(" ")][::-1] image.initialise(dim=tuple(dim), vsize=tuple(vsize), origin=tuple(origin)) attn = attn.reshape(dim) if attn_type == "mu": image.fill(attn) elif attn_type == "rho*100": image.fill(attn / 1000 * photopeak_energy) return image
[docs] def convert_value(val: str): """ Attempt to convert a string to int or float. Also converts a list of numbers if enclosed in { }. Otherwise, returns the stripped string. """ val = val.strip() if val.startswith("{") and val.endswith("}"): try: return [float(x.strip()) for x in val[1:-1].split(",")] except Exception: return val try: if re.fullmatch(r"[-+]?\d+", val): return int(val) # Try float conversion if the value contains a decimal point or exponent. return float(val) if re.search(r"[.\deE+-]", val) else val except ValueError: return val
STIR_ATTRIBUTE_MAPPING = { "number_of_views": "number_of_projections", "azimuthal_angle_extent": "extent_of_rotation", "view_offset": "start_angle", "radionuclide": "isotope_name", "energy_window_lower": "energy_window_lower", "energy_window_upper": "energy_window_upper", "scanner_type": "scanner_type", "number_of_rings": "number_of_rings", "number_of_detectors_per_ring": "number_of_detectors_per_ring", "inner_ring_diameter": "height_to_detector_surface", "tangential_sampling": "default_bin_size", }
[docs] def harmonize_stir_attributes(attributes: dict) -> dict: """Apply canonical naming and derived values to raw STIR attributes.""" harmonized_attributes = {} for key, value in attributes.items(): standard_key = STIR_ATTRIBUTE_MAPPING.get(key, key) harmonized_attributes[standard_key] = value if "inner_ring_diameter" in harmonized_attributes: harmonized_attributes["height_to_detector_surface"] = ( harmonized_attributes["inner_ring_diameter"] / 2 ) return harmonized_attributes
[docs] def extract_attributes_from_stir(header_filepath: str) -> dict: """Extract attributes from a STIR header file (.hs).""" if not isinstance(header_filepath, str): raise ValueError( f"extract_attributes_from_stir() only accepts string filepaths. " f"Got {type(header_filepath)}. If you have an acquisition object, " f"write it to a file first using .write() method." ) return extract_attributes_from_stir_headerfile(header_filepath)
[docs] def extract_attributes_from_stir_headerfile(filename: str) -> dict: """Parse a STIR header file and extract relevant attributes.""" attributes = { "matrix_sizes": {}, "scaling_factors": {}, } patterns = [ ( re.compile(r"!imaging modality\s*:=\s*(.+)", re.IGNORECASE), lambda m: m.group(1).strip(), "modality", ), ( re.compile(r"!type of data\s*:=\s*(.+)", re.IGNORECASE), lambda m: m.group(1).strip(), "data_type", ), ( re.compile(r"imagedata byte order\s*:=\s*(.+)", re.IGNORECASE), lambda m: m.group(1).strip(), "byte_order", ), ( re.compile(r"!number format\s*:=\s*(.+)", re.IGNORECASE), lambda m: m.group(1).strip(), "number_format", ), ( re.compile(r"!number of bytes per pixel\s*:=\s*(\d+)", re.IGNORECASE), lambda m: int(m.group(1)), "bytes_per_pixel", ), ( re.compile(r"calibration factor\s*:=\s*(.+)", re.IGNORECASE), lambda m: float(m.group(1)), "calibration_factor", ), ( re.compile(r"isotope name\s*:=\s*(.+)", re.IGNORECASE), lambda m: m.group(1).strip(), "isotope_name", ), ( re.compile(r"number of dimensions\s*:=\s*(\d+)", re.IGNORECASE), lambda m: int(m.group(1)), "number_of_dimensions", ), ( re.compile(r"!number of projections\s*:=\s*(\d+)", re.IGNORECASE), lambda m: int(m.group(1)), "number_of_projections", ), ( re.compile(r"number of time frames\s*:=\s*(\d+)", re.IGNORECASE), lambda m: int(m.group(1)), "number_of_time_frames", ), ( re.compile( r"!image duration\s*\(sec\)[\[\w\s]*\]\s*:=\s*(\d+)", re.IGNORECASE ), lambda m: int(m.group(1)), "image_duration", ), ( re.compile(r"!extent of rotation\s*:=\s*(.+)", re.IGNORECASE), lambda m: float(m.group(1)), "extent_of_rotation", ), ( re.compile(r"!direction of rotation\s*:=\s*(.+)", re.IGNORECASE), lambda m: m.group(1).strip(), "direction_of_rotation", ), ( re.compile(r"start angle\s*:=\s*(.+)", re.IGNORECASE), lambda m: float(m.group(1)), "start_angle", ), ( re.compile(r"!name of data file\s*:=\s*(.+)", re.IGNORECASE), lambda m: m.group(1).strip(), "data_file", ), ( re.compile(r"energy window lower level\[\d+\]\s*:=\s*(.+)", re.IGNORECASE), lambda m: float(m.group(1)), "energy_window_lower", ), ( re.compile(r"energy window upper level\[\d+\]\s*:=\s*(.+)", re.IGNORECASE), lambda m: float(m.group(1)), "energy_window_upper", ), ] with open(filename, "r") as file: for line in file: if ms_match := re.search( r"!matrix size\s*\[(.+?)\]\s*:=\s*(\d+)", line, re.IGNORECASE ): axis = ms_match[1].strip() attributes["matrix_sizes"][axis] = int(ms_match[2]) continue if sf_match := re.search( r"!scaling factor\s*\(mm/pixel\)\s*\[(.+?)\]\s*:=\s*(.+)", line, re.IGNORECASE, ): axis = sf_match[1].strip() attributes["scaling_factors"][axis] = float(sf_match[2].strip()) continue if re.search(r"(radius|radii)\s*:=\s*(.+)", line, re.IGNORECASE): r_match = re.search(r"(radius|radii)\s*:=\s*(.+)", line, re.IGNORECASE) tmp = r_match[2].strip() if tmp.startswith("{") and tmp.endswith("}") or "," in tmp: tmp = tmp.strip("{}") values = [float(v.strip()) for v in tmp.split(",")] mean_value = float(np.mean(values)) std_of_mean_value = np.std(values) / mean_value if std_of_mean_value > 1e-6: attributes["orbit"] = "non-circular" attributes["radii"] = values attributes["height_to_detector_surface"] = mean_value else: attributes["orbit"] = "Circular" attributes["height_to_detector_surface"] = mean_value else: attributes["orbit"] = "Circular" attributes["height_to_detector_surface"] = float(tmp) continue if orbit_match := re.search(r"orbit\s*:=\s*(.+)", line, re.IGNORECASE): attributes["orbit"] = orbit_match[1].strip() continue for pattern, converter, attr_key in patterns: if match := pattern.search(line): attributes[attr_key] = converter(match) break return harmonize_stir_attributes(attributes)
def _normalize_backend_name(backend: Optional[str]) -> Optional[str]: """Normalize explicit backend names for builder helpers.""" if backend is None: return None normalized = backend.lower() if normalized not in {"sirf", "stir"}: raise ValueError( f"Invalid backend {backend!r}. Expected one of: 'sirf', 'stir', or None." ) return normalized
[docs] def create_stir_image( matrix_dim: list, voxel_size: list, backend: Optional[str] = None ): """Create a uniform (zeros) STIR image object. Args: matrix_dim: Three-element matrix size in ``(z, y, x)`` order. voxel_size: Three-element voxel size in mm for ``(z, y, x)``. backend: Optional explicit backend (``"sirf"`` or ``"stir"``). Returns: Backend image object created by ``STIRSPECTImageDataBuilder``. """ try: from simind_python_connector.builders import STIRSPECTImageDataBuilder except ImportError as exc: raise ImportError( "STIRSPECTImageDataBuilder requires SIRF/STIR to be installed" ) from exc builder = STIRSPECTImageDataBuilder(backend=_normalize_backend_name(backend)) builder.update_header( { "!matrix size [1]": str(matrix_dim[2]), "!matrix size [2]": str(matrix_dim[1]), "!matrix size [3]": str(matrix_dim[0]), "scaling factor (mm/pixel) [1]": str(voxel_size[2]), "scaling factor (mm/pixel) [2]": str(voxel_size[1]), "scaling factor (mm/pixel) [3]": str(voxel_size[0]), } ) builder.set_pixel_array(np.zeros(matrix_dim, dtype=np.float32)) return builder.build()
[docs] def create_stir_acqdata( proj_matrix: list, num_projections: int, pixel_size: list, backend: Optional[str] = None, ): """Create a uniform (zeros) STIR acquisition object. Args: proj_matrix: Two-element projection matrix size in ``(radial, tangential)``. num_projections: Number of projection views. pixel_size: Two-element pixel size in mm for ``(radial, tangential)``. backend: Optional explicit backend (``"sirf"`` or ``"stir"``). Returns: Backend acquisition object created by ``STIRSPECTAcquisitionDataBuilder``. """ try: from simind_python_connector.builders import STIRSPECTAcquisitionDataBuilder except ImportError as exc: raise ImportError( "STIRSPECTAcquisitionDataBuilder requires SIRF/STIR to be installed" ) from exc builder = STIRSPECTAcquisitionDataBuilder(backend=_normalize_backend_name(backend)) builder.update_header( { "!matrix size [1]": str(proj_matrix[0]), "!matrix size [2]": str(proj_matrix[1]), "!number of projections": str(num_projections), "scaling factor (mm/pixel) [1]": str(pixel_size[0]), "scaling factor (mm/pixel) [2]": str(pixel_size[1]), } ) builder.pixel_array = np.zeros( (1, proj_matrix[0], num_projections, proj_matrix[1]), dtype=np.float32 ) return builder.build()
[docs] def create_simple_phantom(): """Create a simple cylindrical phantom with a hot sphere.""" # Create a 64x64x64 image with 4.42mm voxels matrix_dim = [64, 64, 64] voxel_size = [4.42, 4.42, 4.42] # mm # Create empty image phantom = create_stir_image(matrix_dim, voxel_size) phantom_array = get_array(phantom) # Add cylindrical background (body) center = [32, 32, 32] radius = 10 # pixels height = 20 # pixels for z in range(center[0] - height // 2, center[0] + height // 2): for y in range(matrix_dim[1]): for x in range(matrix_dim[2]): if (x - center[1]) ** 2 + (y - center[2]) ** 2 <= radius**2: phantom_array[z, y, x] = 10 # Background activity # Add hot sphere (tumor) sphere_center = [32, 32, 36] sphere_radius = 3 # pixels for z in range(matrix_dim[0]): for y in range(matrix_dim[1]): for x in range(matrix_dim[2]): if (x - sphere_center[2]) ** 2 + (y - sphere_center[1]) ** 2 + ( z - sphere_center[0] ) ** 2 <= sphere_radius**2: phantom_array[z, y, x] = 40 # Hot spot activity phantom.fill(phantom_array) return phantom
[docs] def create_attenuation_map(phantom): """Create a simple attenuation map from the phantom.""" # For simplicity, use uniform attenuation where phantom > 0 mu_water_140keV = 0.15 # cm^-1 approximate for 140 keV attn_array = get_array(phantom).copy() attn_array[attn_array > 0] = mu_water_140keV mu_map = phantom.clone() mu_map.fill(attn_array) return mu_map