import os
import h5py
import hdf5plugin # noqa
import numpy as np
from blissdata.beacon.data import BeaconData
from blissdata.redis_engine.store import DataStore
[docs]
def apply_calibration(xy_ch, calibration=[0, 1]):
"""
Apply calibration to the x- or y-axis
:param xy_ch: x or y -data of a stack
:param calibration: list of coefficients for polynomial equation
:return: calibrated x- or y-data
"""
xy = calibration[0] + calibration[1] * xy_ch
for j in range(2, len(calibration)):
xy += calibration[j] * xy_ch**j
return xy
def _read_h5_fscan2d(h5file, scan_number):
"""
Read scan size and motor axes for the fscan2d scan.
:param h5file: open h5py.File
:param scan_number: e.g. "/30.1"
:return: ``(NDY_, NDX_)``
"""
slow_npoints = int(
h5file[scan_number + "/instrument/fscan_parameters/slow_npoints"][()]
)
fast_npoints = int(
h5file[scan_number + "/instrument/fscan_parameters/fast_npoints"][()]
)
name0 = h5file[scan_number + "/instrument/fscan_parameters/slow_motor"][()].decode(
"utf-8"
)
name1 = h5file[scan_number + "/instrument/fscan_parameters/fast_motor"][()].decode(
"utf-8"
)
axis0 = h5file[scan_number + "/instrument/" + name0 + "/value"][()]
axis1 = h5file[scan_number + "/instrument/" + name1 + "/data"][()]
# axis0 is slow: constant during each fast sweep → [s0,s0,..,s1,s1,..]
if len(axis0) == slow_npoints:
NDY_ = axis0
else:
NDY_ = axis0[::fast_npoints]
# axis1 is fast: repeats full sweep per slow step → [f0,..,fN,f0,..,fN,..]
if len(axis1) == fast_npoints:
NDX_ = axis1
else:
NDX_ = axis1[:fast_npoints]
return NDY_, NDX_
def _read_h5_amesh(h5file, scan_number):
"""
Read scan size and motor axes for the amesh scan.
:param h5file: open h5py.File
:param scan_number: e.g. "/3.1"
:return: ``(NDY_, NDX_)``
"""
non_scalar = []
for name, obj in h5file[scan_number + "/instrument/positioners"].items():
if isinstance(obj, h5py.Dataset) and obj.ndim > 0:
non_scalar.append(obj[()])
if len(non_scalar) != 2:
raise ValueError(
f"Expected 2 non-scalar positioners for amesh, got {len(non_scalar)}"
)
slow_size, fast_size, fast_arr, slow_arr = find_scan_size(
non_scalar[0], non_scalar[1]
)
NDY_ = slow_arr.reshape(slow_size, fast_size)[:, 0]
NDX_ = fast_arr.reshape(slow_size, fast_size)[0, :]
return NDY_, NDX_
def _read_h5_scan_info(h5file, scan_number):
"""
Read scan type, dimensions, and motor axes from an open HDF5 file.
:param h5file: open h5py.File
:param scan_number: e.g. "/30.1"
:return: ``(NDY_, NDX_)``
"""
title = h5file[scan_number]["title"][()]
if isinstance(title, bytes):
title = title.decode()
if "fscan2d" in title:
return _read_h5_fscan2d(h5file, scan_number)
elif "amesh" in title:
return _read_h5_amesh(h5file, scan_number)
else:
raise ValueError(f"Unsupported scan type: '{title}'")
[docs]
def read_axes(stack_path, scan_number, key=None):
"""
Read unique slow- and fast-motor position arrays for axis labelling.
:param stack_path: path to the HDF5 file
:param scan_number: scan number
:param key: bliss key of the scan (for online/redis)
:return: ``(NDY_, NDX_)``
"""
if key is not None:
redis_url = BeaconData().get_redis_data_db()
datastore = DataStore(redis_url)
scan = datastore.load_scan(key)
axis_keys = [s for s in scan.info["channels"] if "axis" in s]
axis_names = [scan.info["channels"][k]["display_name"] for k in axis_keys]
NDY_ = scan.streams["axis:" + axis_names[0]][:]
NDX_ = scan.streams["axis:" + axis_names[1]][:]
elif stack_path.endswith(".h5") and os.path.exists(stack_path):
scan_number = "/" + str(scan_number) + ".1"
with h5py.File(stack_path, "r") as h5file:
NDY_, NDX_ = _read_h5_scan_info(h5file, scan_number)
else:
raise ValueError(f"Invalid or non-existent stack path: '{stack_path}'")
return NDY_, NDX_
[docs]
def read_stack(onefile, counter, scan_number, key):
"""
Create data arrays of a stack from the given file
:param onefile: one path to the stack, or the url to the redis server.
:counter: main counter
:param scan_number: scan number of the map
:param key: bliss key of the scan
:return: not calibrated x, not calibrated y (multidimensional) as data array, and stack as an object, pixels size of the map (if it was defined it is read from the file)
"""
# do not use read from file for online - there are problems with unappropriate locking of h5 file,
# using dynamic_hdf5 or open_item result in reading of empty array as it is read after creation but before filling.
if key is not None: # onefile.startswith("esrf:")
redis_url = BeaconData().get_redis_data_db()
datastore = DataStore(redis_url)
scan = datastore.load_scan(key)
spectrum_stream_name = _counter_to_stream_name(counter)
try:
stack = scan.streams[spectrum_stream_name][:]
except KeyError:
# # needed for tests in demo_session
# try:
# counter_stream = next(
# stream for stream in scan.streams if counter in stream
# )
# stack = scan.streams[counter_stream][:]
# except Exception:
# # could be deleted for the actual beamline
raise KeyError(
f"Spectrum stream '{spectrum_stream_name}' that belongs to counter '{counter}' not found in scan streams"
) from None
pixels = []
axis = [stream for stream in scan.info["channels"] if "axis" in stream]
for k in range(0, len(axis)):
pixels.append(scan.info["channels"][axis[k]]["axis_points"])
try:
stack = np.reshape(stack, (*pixels, stack.shape[-1]))
except (
ValueError
): # for 1D data - required at least for testing in demo session with dmesh
stack = np.reshape(stack, (*pixels, 1))
elif onefile.endswith(".h5"):
scan_number = "/" + str(scan_number) + ".1"
try:
with h5py.File(onefile, "r") as h5file:
NDY_, NDX_ = _read_h5_scan_info(h5file, scan_number)
pixels = [len(NDY_), len(NDX_)]
stack = h5file[scan_number + "/measurement/" + counter][()]
stack = np.reshape(stack, (pixels[0], pixels[1], stack.shape[-1]))
except Exception:
raise Exception(
"Could not read scan parameters."
"Please check that correct scan was selected."
)
else:
raise Exception("The file should be a h5 file. Or use url for online.")
y = stack
# to find the length of last dimension array - to identify x-axis
y_tmp = y
for i in range(0, len(y.shape) - 1):
y_tmp = y_tmp[0]
x_len = len(y_tmp)
x = np.arange(x_len)
return x, y, stack, pixels
[docs]
def find_scan_size(arr0, arr1):
"""
For 'amesh' (and 'dmesh') scans.
Determine (slow_size, fast_size) from two flat motor-positioners.
The fast axis is identified as the motor whose first step is larger.
Motor noise should be much smaller than the step.
The slow axis is then used as a cross-check.
Issues could be if step is comparable to noise,
or if it is actually 1D scan (0 intervals).
:param arr0: first flat motor-position array (1-D array-like).
:param arr1: second flat motor-position array (1-D array-like).
:return: ``(slow_size, fast_size, fast_arr, slow_arr)``.
"""
arr0 = np.asarray(arr0, dtype=float).ravel()
arr1 = np.asarray(arr1, dtype=float).ravel()
# identify fast / slow axes
diff0 = abs(arr0[1] - arr0[0])
diff1 = abs(arr1[1] - arr1[0])
if max(diff0, diff1) == 0:
raise ValueError(
"Could not identify scan dimensions: both axes appear to be constant."
)
if diff0 >= diff1:
fast, slow = arr0, arr1
else:
fast, slow = arr1, arr0
# find fast-axis period via first direction reversal
n = len(fast)
step = fast[1] - fast[0]
fast_size = n
for i in range(1, n - 1):
if (fast[i + 1] - fast[i]) * step < 0:
fast_size = i + 1
break
slow_size = n // fast_size
if n % fast_size != 0:
raise ValueError("Failed to determine scan dimensions" "or scan is incomplete.")
# check dimensions using slow axis
if slow_size > 1:
slow_2d = slow.reshape(slow_size, fast_size)
within = np.abs(np.diff(slow_2d, axis=1)) # noise within each row
between = np.abs(np.diff(slow_2d, axis=0)) # steps between rows
if within.max() > 0.5 * between.min():
raise ValueError(
f"Slow-motor verification failed"
f"noise ({within.max()}) is too large"
f"relative to the step ({between.min()})."
f"Extracted dimensions {slow_size} × {fast_size} are likely wrong."
)
return slow_size, fast_size, fast, slow
[docs]
def read_y_normalization_counter(
filename, stack, normalization_counter, scan_number, pixels, key=None
):
"""
Normalize y-data using defined counter
:param filename: one path to the file or url
:param stack: the original data
:param normalization_counter: name of normalization counter
:param scan_number: the number of scan
:param pixels: pixel size of the map.
:param key: bliss key of the scan
:return: normalized y-data
"""
if key is not None: # onefile.startswith("esrf:")
redis_url = BeaconData().get_redis_data_db()
datastore = DataStore(redis_url)
scan = datastore.load_scan(key)
counter_stream = next(
stream for stream in scan.streams if normalization_counter in stream
)
norm_data = scan.streams[counter_stream][:]
norm_data = np.reshape(norm_data, (*pixels,))
norm_data = norm_data[..., np.newaxis]
view = stack / norm_data
if filename.endswith(".h5"):
with h5py.File(filename, "r") as h5file:
scan_number = str("/") + str(scan_number) + str(".1")
norm_data = h5file[scan_number + "/measurement/" + normalization_counter][
()
]
norm_data = np.reshape(norm_data, (pixels[0], pixels[1]))
view = stack / norm_data[:, :, np.newaxis]
return view
def _counter_to_stream_name(counter: str) -> str:
"""
Convert XEOL spectrum counter name from 'hama1_det00' to Blissdata stream name 'hama1:spectrum:det00'.
"""
if "_" not in counter:
raise ValueError(f"Invalid detector name format: {counter}")
prefix, suffix = counter.rsplit("_", 1)
return f"{prefix}:spectrum:_spec{suffix}"