Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 16 additions & 92 deletions asyncroscopy/DigitalTwin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,18 @@
"""

import json
from datetime import datetime
from pathlib import Path

import numpy as np
import pyTEMlib.image_tools as it
import pyTEMlib.probe_tools as pt
import tango
from ase.io import read
from ase import Atoms
from ase.build import bulk
from PIL import Image, TiffImagePlugin
from tango import AttrWriteType, DevState
from tango.server import Device, attribute, device_property

from asyncroscopy.Microscope import Microscope
from asyncroscopy.software.DataWriter import save_acquisition

DEFAULT_ACQUISITION_DIR = "outputs/tiled_acquisitions"

Expand Down Expand Up @@ -65,8 +62,8 @@ class DigitalTwin(Microscope):
)
acquisition_file_format = device_property(
dtype=str,
default_value="tiff",
doc="Acquisition file format. TIFF stores simulated image data and metadata.",
default_value="h5",
doc="Acquisition file format. HDF5 stores simulated acquisition data and metadata.",
)
data_device_address = device_property(
dtype=str,
Expand Down Expand Up @@ -445,26 +442,6 @@ def write_beam_pos(self, value):
self._beam_pos_x = float(x)
self._beam_pos_y = float(y)

def _make_filename(self, acquisition_type: str, detector: str, data_server, extension: str = "tiff") -> Path:
save_directory = self.acquisition_save_directory
if data_server is not None:
try:
save_directory = data_server.save_path
except tango.DevFailed as exc:
self.warn_stream(f"DATA device not ready: {exc}")

directory = Path(save_directory).expanduser()
directory.mkdir(parents=True, exist_ok=True)
stamp = datetime.now().strftime("%Y%m%dT%H%M%S%f")
name = f"{acquisition_type}_{detector}_{stamp}.{extension.lower().lstrip('.')}"
return directory / name

def _register_path(self, path: Path) -> str:
data_server = self._detector_proxies.get("data")
if data_server is None:
return str(path)
return data_server.register_path(str(path))

def _viewport_metadata(self) -> dict:
fov_ang = self._fov * 1e10
stage_xyz_ang = self._stage_position[:3] * 1e10
Expand All @@ -488,11 +465,6 @@ def _viewport_metadata(self) -> dict:
"particle_count": len(self._particle_records_base),
}

def _save_tiff(self, path: Path, image: np.ndarray, metadata: dict) -> None:
tiff_info = TiffImagePlugin.ImageFileDirectory_v2()
tiff_info[270] = json.dumps(metadata)
Image.fromarray(np.asarray(image)).save(str(path), format="TIFF", tiffinfo=tiff_info)

def _render_stem_image(self, imsize: int, dwell_time: float, detector_list: list) -> np.ndarray:
"""Simulate STEM image acquisition using convolutions of the pseudo-potential and electron probe."""
self._sync_stage_from_proxy()
Expand Down Expand Up @@ -537,53 +509,21 @@ def _render_stem_image(self, imsize: int, dwell_time: float, detector_list: list
noisy_image += self._lowfreq_noise(noisy_image, noise_level=0.1, freq_scale=0.1, rng=rng) * blur_noise_level
return np.clip(noisy_image, 0.0, 1.0).astype(np.float32)

def _acquire_stem_image(self, imsize: int, dwell_time: float, detector_list: list) -> str:
"""Simulate STEM acquisition, save a TIFF with metadata, and return its DATA/Tiled key."""
detector = detector_list[0].upper() if detector_list else "HAADF"
image = self._render_stem_image(int(imsize), float(dwell_time), detector_list)
data_server = self._detector_proxies.get("data")
extension = str(self.acquisition_file_format or "tiff")
path = self._make_filename("stem_image", detector, data_server, extension)
metadata = {
"acquisition_type": "stem_image",
"detector": detector,
"dwell_time": float(dwell_time),
"shape": list(image.shape),
"dtype": str(image.dtype),
"simulation_backend": self.__class__.__name__,
**self._viewport_metadata(),
}
self._save_tiff(path, image, metadata)
return self._register_path(path)

def _acquire_stem_image_advanced(
def _acquire_scanned_image(
self,
imsize: int,
dwell_time: float,
detector_list: list[str],
scan_region: list[float],
) -> list[str]:
"""Perform advanced simulated STEM acquisition and return DATA/Tiled keys."""
saved_paths = []
detector_list: list[str] = ["haadf"],
scan_region: list[float] = [0.0, 0.0, 1.0, 1.0],
) -> str:
"""Simulate STEM acquisition, save HDF5 data with metadata, and return its DATA/Tiled key."""
detector_list = [detector.upper() for detector in detector_list]
data_server = self._detector_proxies.get("data")
images = []
for detector in detector_list:
image = self._render_stem_image(int(imsize), float(dwell_time), [detector])
detector_name = detector.upper()
data_server = self._detector_proxies.get("data")
extension = str(self.acquisition_file_format or "tiff")
path = self._make_filename("stem_image", detector_name, data_server, extension)
metadata = {
"acquisition_type": "stem_image_advanced",
"detector": detector_name,
"dwell_time": float(dwell_time),
"scan_region": [float(v) for v in scan_region],
"shape": list(image.shape),
"dtype": str(image.dtype),
"simulation_backend": self.__class__.__name__,
**self._viewport_metadata(),
}
self._save_tiff(path, image, metadata)
saved_paths.append(self._register_path(path))
return saved_paths
images.append(image)
return save_acquisition(self, data_server, "stem_image", detector_list, images)

def _simulate_spectrum(self, detector_name: str, exposure_time: float) -> dict[str, float]:
"""Simulate EDS spectrum acquisition at the current beam position weighted by surrounding particles."""
Expand Down Expand Up @@ -639,27 +579,11 @@ def _simulate_spectrum(self, detector_name: str, exposure_time: float) -> dict[s
return {el: val / total for el, val in noisy.items()}

def _acquire_spectrum(self, detector_name: str, exposure_time: float) -> str:
"""Simulate spectrum acquisition, save a NumPy file, and return its DATA/Tiled key."""
"""Simulate spectrum acquisition, save HDF5 data, and return its DATA/Tiled key."""
spectrum = self._simulate_spectrum(detector_name, exposure_time)
data_server = self._detector_proxies.get("data")
path = self._make_filename("spectrum", detector_name, data_server, "npy")
spectrum_array = np.array(
list(spectrum.items()),
dtype=[("element", "U8"), ("intensity", "f8")],
)
# TODO: migrate simulated spectra to .emd once the EDS data model is settled.
np.save(path, spectrum_array)
metadata = {
"acquisition_type": "spectrum",
"detector": detector_name,
"exposure_time": float(exposure_time),
"format_note": "Temporary .npy spectrum; migrate to .emd later.",
"spectrum": spectrum,
"simulation_backend": self.__class__.__name__,
**self._viewport_metadata(),
}
path.with_suffix(".json").write_text(json.dumps(metadata, indent=2), encoding="utf-8")
return self._register_path(path)
spectrum_array = np.array(list(spectrum.values()), dtype=np.float64)
return save_acquisition(self, data_server, "spectrum", detector_name, spectrum_array, dataset_name="spectrum")

def _place_beam(self, position) -> None:
"""Place the electron beam at the specified [x, y] coordinates."""
Expand Down
34 changes: 14 additions & 20 deletions asyncroscopy/DigitalTwinBeta.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,13 @@ def write_beam_pos(self, value):
# ------------------------------------------------------------------
# Internal acquisition helpers
# ------------------------------------------------------------------
def _acquire_stem_image(self, imsize: int, dwell_time: float, detector_list: list) -> np.ndarray:
def _acquire_scanned_image(
self,
imsize: int,
dwell_time: float,
detector_list: list[str] = ["haadf"],
scan_region: list[float] = [0.0, 0.0, 1.0, 1.0],
) -> np.ndarray:
"""
Acquire a simulated STEM image using the pre-cooked sample state.
Requires _cook_sample_recipe() to have been called immediately before this.
Expand Down Expand Up @@ -216,7 +222,7 @@ def lowfreq_noise(image, noise_level=0.1, freq_scale=0.1):
particle_lookup = self._particle_lookup # {label -> metadata}

# ── Rebuild _particle_records from cooked projection ──────────────────────
# This keeps the contract that _acquire_stem_image always refreshes
# This keeps the contract that _acquire_scanned_image always refreshes
# _particle_records to match whatever the current image will show,
# accounting for stage shift/tilt applied in _cook_sample_recipe.
#
Expand Down Expand Up @@ -263,7 +269,7 @@ def lowfreq_noise(image, noise_level=0.1, freq_scale=0.1):
})

self._particle_records = particle_records
print(f"_acquire_stem_image: {len(particle_records)} particles in FOV")
print(f"_acquire_scanned_image: {len(particle_records)} particles in FOV")

# ── Pseudo-potential + PSF convolution (unchanged logic) ──────────────────
edge = 2 * edge_crop * pixel_size
Expand Down Expand Up @@ -294,18 +300,6 @@ def lowfreq_noise(image, noise_level=0.1, freq_scale=0.1):

return np.array(noisy_image, dtype=np.float32)

def _acquire_stem_image_advanced(
self,
imsize: int,
dwell_time: float,
detector_list: list[str],
scan_region: list[float],
) -> list[np.ndarray]:
return [
self._acquire_stem_image(imsize, dwell_time, [detector])
for detector in detector_list
]

def _make_sample_recipe(self):
"""
Build three persistent data structures for the sample:
Expand All @@ -319,13 +313,13 @@ def _make_sample_recipe(self):

# ── World geometry ──────────────────────────────────────────────────────────
fov_ang = self._fov * 1e10 # Angstroms, lateral
world_z_ang = fov_ang * 0.5 # thin slab, same as _acquire_stem_image
world_z_ang = fov_ang * 0.5 # thin slab, same as _acquire_scanned_image
vox_size = fov_ang / self._imsize # Angstroms per voxel (isotropic)

nx = ny = self._imsize
nz = max(1, int(round(world_z_ang / vox_size)))

# ── Particle parameters (mirror _acquire_stem_image) ────────────────────────
# ── Particle parameters (mirror _acquire_scanned_image) ────────────────────────
particle_radius = 16.0
radius_std = 2.0
aspect_ratio = 0.4
Expand Down Expand Up @@ -354,7 +348,7 @@ def rotation_matrix(alpha, beta, gamma):
[0, np.sin(g), np.cos(g)]])
return Rz @ Ry @ Rx

# ── 1. Place particle centres (same exclusion logic as _acquire_stem_image) ─
# ── 1. Place particle centres (same exclusion logic as _acquire_scanned_image) ─
placed_centers = []
placed_particles = []
particle_lookup = {} # label (1-based int) -> metadata dict
Expand Down Expand Up @@ -406,7 +400,7 @@ def rotation_matrix(alpha, beta, gamma):

# ── 2. Build 3-D label map ─────────────────────────────────────────────────
# Each voxel gets the integer label of whichever particle owns it (0 = none).
# We use an ellipsoidal test identical to _acquire_stem_image's r_scaled mask.
# We use an ellipsoidal test identical to _acquire_scanned_image's r_scaled mask.
label_map = np.zeros((nx, ny, nz), dtype=np.uint8)

# Pre-build voxel coordinate arrays once (Angstrom positions of voxel centres)
Expand Down Expand Up @@ -484,7 +478,7 @@ def _cook_sample_recipe(self):
to both the atom positions and the label map, then project both to 2-D.

After this call, self._cooked_atoms and self._cooked_projection are ready
for consumption by _acquire_stem_image.
for consumption by _acquire_scanned_image.

Projection strategy
-------------------
Expand Down
81 changes: 21 additions & 60 deletions asyncroscopy/Microscope.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
from typing import Optional


from abc import abstractmethod, ABC, ABCMeta
from abc import abstractmethod, ABCMeta

import tango
from tango import AttrWriteType, DevEncoded, DevState, DevVarFloatArray, DevFloat
from tango import AttrWriteType, DevEncoded, DevState, DevVarFloatArray, DevFloat, DevVarStringArray
from tango.server import Device, DeviceMeta, attribute, command, device_property

class CombinedMeta(DeviceMeta, ABCMeta):
Expand Down Expand Up @@ -157,29 +157,17 @@ def acquire_spectrum(self, detector_name: str) -> str:
proxy = self._detector_proxies.get(detector_name)
return self._acquire_spectrum(detector_name, proxy.exposure_time)

@command(dtype_out=str)
def acquire_scanned_image(self) -> str:
"""Acquire a STEM image and return a key pointing to that data. You can get the data with the get_image_from_key command"""
@command(dtype_in=DevVarStringArray, dtype_out=str)
def acquire_scanned_image(self, detector_list: list[str] = ["haadf"]) -> str:
"""Acquire an image with scanning detectors and return a key pointing to that data. You can get the data with the get_image_from_key command"""
scan = self._detector_proxies.get("scan")
return self._acquire_stem_image(scan.imsize, scan.dwell_time, ["haadf"])

@command(dtype_out=str)
def acquire_scanned_image_advanced(self) -> str:
"""Acquire a STEM image using advanced STEM settings from the scan device."""
scan = self._detector_proxies.get("scan")
detector_names = [detector for detector in ("haadf", "bf") if bool(getattr(scan, detector))] or ["haadf"]
unique_ids = self._acquire_stem_image_advanced(scan.imsize, scan.dwell_time, detector_names, list(scan.scan_region))
if isinstance(unique_ids, str):
return unique_ids

unique_ids = list(unique_ids)
return unique_ids[0] if len(unique_ids) == 1 else json.dumps(unique_ids)
return self._acquire_scanned_image(scan.imsize, scan.dwell_time, detector_list, list(scan.scan_region))

@command(dtype_out=str)
def acquire_scanned_data_advanced(self) -> str:
"""Trigger an advanced 4D STEM data acquisition with the Ceta camera."""
"""Trigger an advanced 4D scanned data acquisition with the Ceta camera."""
scan = self._detector_proxies.get("scan")
return self._acquire_stem_data_advanced(scan.imsize, scan.dwell_time, "BM-Ceta", list(scan.scan_region))
return self._acquire_scanned_data_advanced(scan.imsize, scan.dwell_time, "BM-Ceta", list(scan.scan_region))

@command(dtype_out=str)
def acquire_camera_image(self) -> str:
Expand All @@ -193,33 +181,11 @@ def acquire_flucam_image(self) -> str:
flucam = self._detector_proxies.get("flucam")
return self._acquire_camera_image(flucam.imsize, flucam.exposure_time, "Flucam", flucam.readout_area)

@command(dtype_in=('str',), dtype_out=str)
def acquire_images(self, detector_names: list[str]) -> str:
"""
Acquire multiple STEM images simultaneously.

Parameters
----------
detector_names: list of detector names, e.g. ["HAADF", "BF"]

Returns
-------
JSON string containing unique ids returned by the vendor-specific
implementation.
"""
detector_names = [name.strip() for name in detector_names]
scan = self._detector_proxies.get("scan")
unique_ids = self._acquire_stem_image_advanced(scan.imsize, scan.dwell_time, detector_names, [0.0, 0.0, 1.0, 1.0])
if isinstance(unique_ids, str):
return unique_ids
unique_ids = list(unique_ids)
return unique_ids[0] if len(unique_ids) == 1 else json.dumps(unique_ids)

@command(dtype_in=int, dtype_out=DevEncoded)
def get_image_data_cached(self, index: int) -> tuple[str, bytes]:
"""Retrieve cached image by index."""
if not hasattr(self, '_cached_images'):
tango.Except.throw_exception("NoCache", "Call acquire_images() first", "get_image_data()")
tango.Except.throw_exception("NoCache", "Call acquire_scanned_image() first", "get_image_data()")
if index >= len(self._cached_images):
tango.Except.throw_exception("InvalidIndex", f"Index {index} out of range", "get_image_data()")

Expand Down Expand Up @@ -344,8 +310,14 @@ def set_image_shift(self, shift):
# Internal acquisition helpers
# ------------------------------------------------------------------
@abstractmethod
def _acquire_stem_image(self, imsize: int, dwell_time: float, detector_list: list[str]):
"""Vendor-specific STEM acquisition implementation."""
def _acquire_scanned_image(
self,
imsize: int,
dwell_time: float,
detector_list: list[str] = ["haadf"],
scan_region: list[float] = [0.0, 0.0, 1.0, 1.0],
) -> str:
"""Vendor-specific scanned image acquisition implementation."""
pass

def _acquire_camera_image(self, imsize: int, exposure_time: float, detector: str, readout_area: str) -> str:
Expand All @@ -356,31 +328,20 @@ def _acquire_camera_image(self, imsize: int, exposure_time: float, detector: str
"_acquire_camera_image()",
)

def _acquire_stem_data_advanced(
def _acquire_scanned_data_advanced(
self,
imsize: int,
dwell_time: float,
detector: str,
scan_region: list[float],
) -> str:
"""Vendor-specific advanced 4D STEM data acquisition trigger."""
"""Vendor-specific advanced 4D scanned data acquisition trigger."""
tango.Except.throw_exception(
"UnsupportedCommand",
"This microscope does not support advanced STEM data acquisition.",
"_acquire_stem_data_advanced()",
"This microscope does not support advanced scanned data acquisition.",
"_acquire_scanned_data_advanced()",
)

@abstractmethod
def _acquire_stem_image_advanced(
self,
imsize: int,
dwell_time: float,
detector_list: list[str],
scan_region: list[float],
) -> list:
"""Vendor-specific multi-image acquisition implementation."""
pass

def _place_beam(self, position):
# define in the inherit class
pass
Expand Down
Loading
Loading