Source code for ewoksid16b.tasks.utils.read

import os

import h5py
import hdf5plugin  # noqa
import numpy as np
from blissdata.beacon.data import BeaconData
from blissdata.redis_engine.store import DataStore


[docs] def apply_calibration(xy_ch, calibration=[0, 1]): """ Apply calibration to the x- or y-axis :param xy_ch: x or y -data of a stack :param calibration: list of coefficients for polynomial equation :return: calibrated x- or y-data """ xy = calibration[0] + calibration[1] * xy_ch for j in range(2, len(calibration)): xy += calibration[j] * xy_ch**j return xy
def _read_h5_fscan2d(h5file, scan_number): """ Read scan size and motor axes for the fscan2d scan. :param h5file: open h5py.File :param scan_number: e.g. "/30.1" :return: ``(NDY_, NDX_)`` """ slow_npoints = int( h5file[scan_number + "/instrument/fscan_parameters/slow_npoints"][()] ) fast_npoints = int( h5file[scan_number + "/instrument/fscan_parameters/fast_npoints"][()] ) name0 = h5file[scan_number + "/instrument/fscan_parameters/slow_motor"][()].decode( "utf-8" ) name1 = h5file[scan_number + "/instrument/fscan_parameters/fast_motor"][()].decode( "utf-8" ) axis0 = h5file[scan_number + "/instrument/" + name0 + "/value"][()] axis1 = h5file[scan_number + "/instrument/" + name1 + "/data"][()] # axis0 is slow: constant during each fast sweep → [s0,s0,..,s1,s1,..] if len(axis0) == slow_npoints: NDY_ = axis0 else: NDY_ = axis0[::fast_npoints] # axis1 is fast: repeats full sweep per slow step → [f0,..,fN,f0,..,fN,..] if len(axis1) == fast_npoints: NDX_ = axis1 else: NDX_ = axis1[:fast_npoints] return NDY_, NDX_ def _read_h5_amesh(h5file, scan_number): """ Read scan size and motor axes for the amesh scan. :param h5file: open h5py.File :param scan_number: e.g. "/3.1" :return: ``(NDY_, NDX_)`` """ non_scalar = [] for name, obj in h5file[scan_number + "/instrument/positioners"].items(): if isinstance(obj, h5py.Dataset) and obj.ndim > 0: non_scalar.append(obj[()]) if len(non_scalar) != 2: raise ValueError( f"Expected 2 non-scalar positioners for amesh, got {len(non_scalar)}" ) slow_size, fast_size, fast_arr, slow_arr = find_scan_size( non_scalar[0], non_scalar[1] ) NDY_ = slow_arr.reshape(slow_size, fast_size)[:, 0] NDX_ = fast_arr.reshape(slow_size, fast_size)[0, :] return NDY_, NDX_ def _read_h5_scan_info(h5file, scan_number): """ Read scan type, dimensions, and motor axes from an open HDF5 file. :param h5file: open h5py.File :param scan_number: e.g. "/30.1" :return: ``(NDY_, NDX_)`` """ title = h5file[scan_number]["title"][()] if isinstance(title, bytes): title = title.decode() if "fscan2d" in title: return _read_h5_fscan2d(h5file, scan_number) elif "amesh" in title: return _read_h5_amesh(h5file, scan_number) else: raise ValueError(f"Unsupported scan type: '{title}'")
[docs] def read_axes(stack_path, scan_number, key=None): """ Read unique slow- and fast-motor position arrays for axis labelling. :param stack_path: path to the HDF5 file :param scan_number: scan number :param key: bliss key of the scan (for online/redis) :return: ``(NDY_, NDX_)`` """ if key is not None: redis_url = BeaconData().get_redis_data_db() datastore = DataStore(redis_url) scan = datastore.load_scan(key) axis_keys = [s for s in scan.info["channels"] if "axis" in s] axis_names = [scan.info["channels"][k]["display_name"] for k in axis_keys] NDY_ = scan.streams["axis:" + axis_names[0]][:] NDX_ = scan.streams["axis:" + axis_names[1]][:] elif stack_path.endswith(".h5") and os.path.exists(stack_path): scan_number = "/" + str(scan_number) + ".1" with h5py.File(stack_path, "r") as h5file: NDY_, NDX_ = _read_h5_scan_info(h5file, scan_number) else: raise ValueError(f"Invalid or non-existent stack path: '{stack_path}'") return NDY_, NDX_
[docs] def read_stack(onefile, counter, scan_number, key): """ Create data arrays of a stack from the given file :param onefile: one path to the stack, or the url to the redis server. :counter: main counter :param scan_number: scan number of the map :param key: bliss key of the scan :return: not calibrated x, not calibrated y (multidimensional) as data array, and stack as an object, pixels size of the map (if it was defined it is read from the file) """ # do not use read from file for online - there are problems with unappropriate locking of h5 file, # using dynamic_hdf5 or open_item result in reading of empty array as it is read after creation but before filling. if key is not None: # onefile.startswith("esrf:") redis_url = BeaconData().get_redis_data_db() datastore = DataStore(redis_url) scan = datastore.load_scan(key) spectrum_stream_name = _counter_to_stream_name(counter) try: stack = scan.streams[spectrum_stream_name][:] except KeyError: # # needed for tests in demo_session # try: # counter_stream = next( # stream for stream in scan.streams if counter in stream # ) # stack = scan.streams[counter_stream][:] # except Exception: # # could be deleted for the actual beamline raise KeyError( f"Spectrum stream '{spectrum_stream_name}' that belongs to counter '{counter}' not found in scan streams" ) from None pixels = [] axis = [stream for stream in scan.info["channels"] if "axis" in stream] for k in range(0, len(axis)): pixels.append(scan.info["channels"][axis[k]]["axis_points"]) try: stack = np.reshape(stack, (*pixels, stack.shape[-1])) except ( ValueError ): # for 1D data - required at least for testing in demo session with dmesh stack = np.reshape(stack, (*pixels, 1)) elif onefile.endswith(".h5"): scan_number = "/" + str(scan_number) + ".1" try: with h5py.File(onefile, "r") as h5file: NDY_, NDX_ = _read_h5_scan_info(h5file, scan_number) pixels = [len(NDY_), len(NDX_)] stack = h5file[scan_number + "/measurement/" + counter][()] stack = np.reshape(stack, (pixels[0], pixels[1], stack.shape[-1])) except Exception: raise Exception( "Could not read scan parameters." "Please check that correct scan was selected." ) else: raise Exception("The file should be a h5 file. Or use url for online.") y = stack # to find the length of last dimension array - to identify x-axis y_tmp = y for i in range(0, len(y.shape) - 1): y_tmp = y_tmp[0] x_len = len(y_tmp) x = np.arange(x_len) return x, y, stack, pixels
[docs] def find_scan_size(arr0, arr1): """ For 'amesh' (and 'dmesh') scans. Determine (slow_size, fast_size) from two flat motor-positioners. The fast axis is identified as the motor whose first step is larger. Motor noise should be much smaller than the step. The slow axis is then used as a cross-check. Issues could be if step is comparable to noise, or if it is actually 1D scan (0 intervals). :param arr0: first flat motor-position array (1-D array-like). :param arr1: second flat motor-position array (1-D array-like). :return: ``(slow_size, fast_size, fast_arr, slow_arr)``. """ arr0 = np.asarray(arr0, dtype=float).ravel() arr1 = np.asarray(arr1, dtype=float).ravel() # identify fast / slow axes diff0 = abs(arr0[1] - arr0[0]) diff1 = abs(arr1[1] - arr1[0]) if max(diff0, diff1) == 0: raise ValueError( "Could not identify scan dimensions: both axes appear to be constant." ) if diff0 >= diff1: fast, slow = arr0, arr1 else: fast, slow = arr1, arr0 # find fast-axis period via first direction reversal n = len(fast) step = fast[1] - fast[0] fast_size = n for i in range(1, n - 1): if (fast[i + 1] - fast[i]) * step < 0: fast_size = i + 1 break slow_size = n // fast_size if n % fast_size != 0: raise ValueError("Failed to determine scan dimensions" "or scan is incomplete.") # check dimensions using slow axis if slow_size > 1: slow_2d = slow.reshape(slow_size, fast_size) within = np.abs(np.diff(slow_2d, axis=1)) # noise within each row between = np.abs(np.diff(slow_2d, axis=0)) # steps between rows if within.max() > 0.5 * between.min(): raise ValueError( f"Slow-motor verification failed" f"noise ({within.max()}) is too large" f"relative to the step ({between.min()})." f"Extracted dimensions {slow_size} × {fast_size} are likely wrong." ) return slow_size, fast_size, fast, slow
[docs] def read_y_normalization_counter( filename, stack, normalization_counter, scan_number, pixels, key=None ): """ Normalize y-data using defined counter :param filename: one path to the file or url :param stack: the original data :param normalization_counter: name of normalization counter :param scan_number: the number of scan :param pixels: pixel size of the map. :param key: bliss key of the scan :return: normalized y-data """ if key is not None: # onefile.startswith("esrf:") redis_url = BeaconData().get_redis_data_db() datastore = DataStore(redis_url) scan = datastore.load_scan(key) counter_stream = next( stream for stream in scan.streams if normalization_counter in stream ) norm_data = scan.streams[counter_stream][:] norm_data = np.reshape(norm_data, (*pixels,)) norm_data = norm_data[..., np.newaxis] view = stack / norm_data if filename.endswith(".h5"): with h5py.File(filename, "r") as h5file: scan_number = str("/") + str(scan_number) + str(".1") norm_data = h5file[scan_number + "/measurement/" + normalization_counter][ () ] norm_data = np.reshape(norm_data, (pixels[0], pixels[1])) view = stack / norm_data[:, :, np.newaxis] return view
def _counter_to_stream_name(counter: str) -> str: """ Convert XEOL spectrum counter name from 'hama1_det00' to Blissdata stream name 'hama1:spectrum:det00'. """ if "_" not in counter: raise ValueError(f"Invalid detector name format: {counter}") prefix, suffix = counter.rsplit("_", 1) return f"{prefix}:spectrum:_spec{suffix}"