# This file contains a few useful functions for converting SIMIND output to
# STIR format.
### It should probably be cleaned up and object-orientedified at some point.
### Author: Sam Porter, Efstathios Varzakis
import contextlib
import os
import re
import tempfile
from typing import Optional
import numpy as np
from simind_python_connector.utils.backend_access import BACKENDS
from . import get_array
from .import_helpers import get_sirf_types
from .interfile_parser import parse_interfile_header, parse_interfile_line
# Conditional import for SIRF to avoid CI dependencies
ImageData, AcquisitionData, SIRF_AVAILABLE = get_sirf_types()
# Unpack interfaces needed by stir_utils
create_acquisition_data = BACKENDS.factories.create_acquisition_data
create_image_data = BACKENDS.factories.create_image_data
def _parse_interfile_text(text: str):
"""Parse interfile-formatted text and return a dictionary."""
values = {}
for line in text.splitlines():
key, value = parse_interfile_line(line)
if key is not None:
values[key] = value
return values
[docs]
def parse_sinogram(template_sinogram):
"""Parse sinogram metadata without colliding temp files.
Accepts either a path to an interfile header or an acquisition object that
exposes ``get_info`` or ``write``. The parser reuses the shared
``parse_interfile_header`` helper for consistent behaviour.
"""
if isinstance(template_sinogram, (str, os.PathLike)):
return parse_interfile_header(str(template_sinogram))
if hasattr(template_sinogram, "get_info") and callable(
template_sinogram.get_info # type: ignore[attr-defined]
):
return _parse_interfile_text(template_sinogram.get_info()) # type: ignore[attr-defined]
if hasattr(template_sinogram, "filename"):
return parse_interfile_header(template_sinogram.filename)
if hasattr(template_sinogram, "write") and callable(template_sinogram.write):
with tempfile.NamedTemporaryFile(suffix=".hs", delete=False) as tmp:
tmp_path = tmp.name
try:
template_sinogram.write(tmp_path)
return parse_interfile_header(tmp_path)
finally:
with contextlib.suppress(FileNotFoundError):
os.remove(tmp_path)
raise TypeError(
"template_sinogram must be a path or acquisition object with "
"get_info()/write() methods"
)
[docs]
def parse_interfile(filename):
"""Parse STIR interfile and return dictionary of key-value pairs.
This function is kept for backward compatibility.
New code should use parse_interfile_header() from interfile_parser module.
Args:
filename: Path to interfile file
Returns:
Dictionary of key-value pairs
"""
return parse_interfile_header(filename)
[docs]
def get_sirf_attenuation_from_simind(
attn_filename, photopeak_energy=0.12, attn_type="mu"
):
"""Reads attenuation data from simind attenuation file and returns SIRF
ImageData object
Args:
attn_filename (string): file name of simind attenuation file header
new_filename (string): file name of new attenuation file header
attn_type (str, optional): unit of attenuation binary file. Defaults to 'mu'.
Returns:
image: SIRF ImageData object containing attenuation data
"""
if attn_type == "mu":
data_type = np.float32
elif attn_type == "rho*1000":
data_type = np.uint16
# remove suffix from filename if present
if attn_filename[-3:] in ["ict", "hct"]:
attn_filename = attn_filename[:-4]
attn = np.fromfile(f"{attn_filename}.ict", dtype=data_type)
image = ImageData()
header_dict = parse_interfile(f"{attn_filename}.hct")
dim = [int(header_dict["!matrix size [%d]" % i]) for i in range(1, 4)][::-1]
vsize = [
float(header_dict["scaling factor (mm/pixel) [%d]" % i]) for i in range(1, 3)
]
vsize.append(header_dict["# scaling factor (mm/pixel) [3]"])
# origin looks like '-128.0000 -128.0000 128.000000'
origin_string = header_dict["# Image Position First image"]
origin = [float(i) for i in origin_string.split(" ")][::-1]
image.initialise(dim=tuple(dim), vsize=tuple(vsize), origin=tuple(origin))
attn = attn.reshape(dim)
if attn_type == "mu":
image.fill(attn)
elif attn_type == "rho*100":
image.fill(attn / 1000 * photopeak_energy)
return image
[docs]
def convert_value(val: str):
"""
Attempt to convert a string to int or float.
Also converts a list of numbers if enclosed in { }.
Otherwise, returns the stripped string.
"""
val = val.strip()
if val.startswith("{") and val.endswith("}"):
try:
return [float(x.strip()) for x in val[1:-1].split(",")]
except Exception:
return val
try:
if re.fullmatch(r"[-+]?\d+", val):
return int(val)
# Try float conversion if the value contains a decimal point or exponent.
return float(val) if re.search(r"[.\deE+-]", val) else val
except ValueError:
return val
STIR_ATTRIBUTE_MAPPING = {
"number_of_views": "number_of_projections",
"azimuthal_angle_extent": "extent_of_rotation",
"view_offset": "start_angle",
"radionuclide": "isotope_name",
"energy_window_lower": "energy_window_lower",
"energy_window_upper": "energy_window_upper",
"scanner_type": "scanner_type",
"number_of_rings": "number_of_rings",
"number_of_detectors_per_ring": "number_of_detectors_per_ring",
"inner_ring_diameter": "height_to_detector_surface",
"tangential_sampling": "default_bin_size",
}
[docs]
def harmonize_stir_attributes(attributes: dict) -> dict:
"""Apply canonical naming and derived values to raw STIR attributes."""
harmonized_attributes = {}
for key, value in attributes.items():
standard_key = STIR_ATTRIBUTE_MAPPING.get(key, key)
harmonized_attributes[standard_key] = value
if "inner_ring_diameter" in harmonized_attributes:
harmonized_attributes["height_to_detector_surface"] = (
harmonized_attributes["inner_ring_diameter"] / 2
)
return harmonized_attributes
def _normalize_backend_name(backend: Optional[str]) -> Optional[str]:
"""Normalize explicit backend names for builder helpers."""
if backend is None:
return None
normalized = backend.lower()
if normalized not in {"sirf", "stir"}:
raise ValueError(
f"Invalid backend {backend!r}. Expected one of: 'sirf', 'stir', or None."
)
return normalized
[docs]
def create_stir_image(
matrix_dim: list, voxel_size: list, backend: Optional[str] = None
):
"""Create a uniform (zeros) STIR image object.
Args:
matrix_dim: Three-element matrix size in ``(z, y, x)`` order.
voxel_size: Three-element voxel size in mm for ``(z, y, x)``.
backend: Optional explicit backend (``"sirf"`` or ``"stir"``).
Returns:
Backend image object created by ``STIRSPECTImageDataBuilder``.
"""
try:
from simind_python_connector.builders import STIRSPECTImageDataBuilder
except ImportError as exc:
raise ImportError(
"STIRSPECTImageDataBuilder requires SIRF/STIR to be installed"
) from exc
builder = STIRSPECTImageDataBuilder(backend=_normalize_backend_name(backend))
builder.update_header(
{
"!matrix size [1]": str(matrix_dim[2]),
"!matrix size [2]": str(matrix_dim[1]),
"!matrix size [3]": str(matrix_dim[0]),
"scaling factor (mm/pixel) [1]": str(voxel_size[2]),
"scaling factor (mm/pixel) [2]": str(voxel_size[1]),
"scaling factor (mm/pixel) [3]": str(voxel_size[0]),
}
)
builder.set_pixel_array(np.zeros(matrix_dim, dtype=np.float32))
return builder.build()
[docs]
def create_stir_acqdata(
proj_matrix: list,
num_projections: int,
pixel_size: list,
backend: Optional[str] = None,
):
"""Create a uniform (zeros) STIR acquisition object.
Args:
proj_matrix: Two-element projection matrix size in ``(radial, tangential)``.
num_projections: Number of projection views.
pixel_size: Two-element pixel size in mm for ``(radial, tangential)``.
backend: Optional explicit backend (``"sirf"`` or ``"stir"``).
Returns:
Backend acquisition object created by ``STIRSPECTAcquisitionDataBuilder``.
"""
try:
from simind_python_connector.builders import STIRSPECTAcquisitionDataBuilder
except ImportError as exc:
raise ImportError(
"STIRSPECTAcquisitionDataBuilder requires SIRF/STIR to be installed"
) from exc
builder = STIRSPECTAcquisitionDataBuilder(backend=_normalize_backend_name(backend))
builder.update_header(
{
"!matrix size [1]": str(proj_matrix[0]),
"!matrix size [2]": str(proj_matrix[1]),
"!number of projections": str(num_projections),
"scaling factor (mm/pixel) [1]": str(pixel_size[0]),
"scaling factor (mm/pixel) [2]": str(pixel_size[1]),
}
)
builder.pixel_array = np.zeros(
(1, proj_matrix[0], num_projections, proj_matrix[1]), dtype=np.float32
)
return builder.build()
[docs]
def create_simple_phantom():
"""Create a simple cylindrical phantom with a hot sphere."""
# Create a 64x64x64 image with 4.42mm voxels
matrix_dim = [64, 64, 64]
voxel_size = [4.42, 4.42, 4.42] # mm
# Create empty image
phantom = create_stir_image(matrix_dim, voxel_size)
phantom_array = get_array(phantom)
# Add cylindrical background (body)
center = [32, 32, 32]
radius = 10 # pixels
height = 20 # pixels
for z in range(center[0] - height // 2, center[0] + height // 2):
for y in range(matrix_dim[1]):
for x in range(matrix_dim[2]):
if (x - center[1]) ** 2 + (y - center[2]) ** 2 <= radius**2:
phantom_array[z, y, x] = 10 # Background activity
# Add hot sphere (tumor)
sphere_center = [32, 32, 36]
sphere_radius = 3 # pixels
for z in range(matrix_dim[0]):
for y in range(matrix_dim[1]):
for x in range(matrix_dim[2]):
if (x - sphere_center[2]) ** 2 + (y - sphere_center[1]) ** 2 + (
z - sphere_center[0]
) ** 2 <= sphere_radius**2:
phantom_array[z, y, x] = 40 # Hot spot activity
phantom.fill(phantom_array)
return phantom
[docs]
def create_attenuation_map(phantom):
"""Create a simple attenuation map from the phantom."""
# For simplicity, use uniform attenuation where phantom > 0
mu_water_140keV = 0.15 # cm^-1 approximate for 140 keV
attn_array = get_array(phantom).copy()
attn_array[attn_array > 0] = mu_water_140keV
mu_map = phantom.clone()
mu_map.fill(attn_array)
return mu_map