Source code for ewoksid16b.tasks.xeoltasks

import os
import shutil
import tempfile

import h5py
import hdf5plugin  # noqa
import numpy as np
from blissdata.beacon.data import BeaconData
from blissdata.redis_engine.store import DataStore
from ewoks import convert_graph
from ewokscore import Task
from PyMca5.PyMcaIO import ConfigDict
from PyMca5.PyMcaMath.fitting.StackSimpleFit import StackSimpleFit as TmpClass
from silx.io.dictdump import dicttonx

from .utils import calibration
from .utils import read


# not to save EDF files
[docs] class StackSimpleFit(TmpClass):
[docs] def onProcessStackFinished(self): pass
[docs] class ReadStack( Task, input_names=["stack_path", "scan_number"], optional_input_names=[ "threshold", "calibration_x", "calibration_y", "norm_counter", "counter", "key", ], output_names=["x", "stack", "y", "mask"], ): """Read the data. Calibrate, normalize, reshape the data for next steps"""
[docs] def run(self): x, stack, y, mask = self.define_x_y_stack( self.get_input_value("stack_path"), self.get_input_value("norm_counter", None), self.get_input_value("calibration_x", None), self.get_input_value("calibration_y", None), self.get_input_value("counter", None), self.get_input_value("scan_number"), self.get_input_value("threshold", 0), self.get_input_value("key", None), ) self.outputs.x = x self.outputs.stack = stack self.outputs.y = y self.outputs.mask = mask
[docs] def define_x_y_stack( self, file, scale_normalizer, calibration_x, calibration_y, counter, scan_number, roi=0, key=None, ): """ define full set of data :param file: a single path to a file with stack. Also could be an url to the redis server. :param scale_normalizer: short name of counter for normalization :param calibration_x: x-axis calibration :param calibration_y: y-axis calibration :param counter: main counter :param scan_number: scan number of the map :param roi: unique or a list of threshold :return: calibrated x, stacks(initial data), calibrated y, mask coordinates """ x, y, stack, pixels = read.read_stack(file, counter, scan_number, key) if scale_normalizer is not None and scale_normalizer is not False: y = read.read_y_normalization_counter( file, stack, scale_normalizer, scan_number, pixels, key ) if calibration_x is not None: x = calibration.apply_calibration(x, calibration_x) if calibration_y is not None: y = calibration.apply_calibration(y, calibration_y) x = np.array(x) # accept all pixel above threshold spc = y.sum(axis=-1) res_roi = np.where(spc >= roi) # convert mask positioners into an actual mask mask = np.zeros(y.shape[0:2], np.uint8) mask[res_roi[0], res_roi[1]] = 1 if (mask == 0).all(): # to avoid fully masked map - as it will not be fitted print("everything is masked, one point will be unmasked") spc = y.sum(axis=-1) max_spc = np.where(spc == np.max(spc)) mask[max_spc[0], max_spc[1]] = 1 return x, stack, y, mask
[docs] class ReadCorrectConfig( Task, input_names=["config_path"], output_names=["config"], ): """read configuration file and create a dictionary and a mask"""
[docs] def run(self): cfg_file = self.get_input_value("config_path") current_configuration = ConfigDict.ConfigDict() current_configuration.read(cfg_file) # sometimes background is saved not as a list because it is single constnat. back_ground = current_configuration["functions"]["User Estimated Constant"][ "configuration" ]["estimation"]["parameters"] if not isinstance(back_ground, list): print("back_ground must be a list, correction was applied") current_configuration["functions"]["User Estimated Constant"][ "configuration" ]["estimation"]["parameters"] = [back_ground] # PyMca sometimes fail to read functions from configuration then it reads it from the file # but file path is absolute - thus, if cfg was saved on different computer the process crushes. stack_fitter = StackSimpleFit() try: stack_fitter.setConfiguration(current_configuration) except (RuntimeError, FileNotFoundError, ModuleNotFoundError, ValueError): print( "PyMca is trying to reach path which do not exist. It will be redirected to the installed library" ) import PyMca5.PyMcaMath.fitting.SimpleFitUserEstimatedFunctions as dummy # to change things like config["functions"]["User Estimated Constant"]["file"] for key, subdict in current_configuration.get("functions", {}).items(): if isinstance(subdict, dict) and "file" in subdict: subdict["file"] = dummy.__file__ self.outputs.config = current_configuration
[docs] class SaveXeolH5( Task, input_names=[ "save_path", "stack_path", "x", "y", "stack", "parameters", "config", "scan_number", ], optional_input_names=["calibration_x", "calibration_y", "mask", "key"], output_names=["finished", "group_names", "config"], ): """save results to external h5 file"""
[docs] def run(self): self.group_names = [] config = self.get_input_value("config") parameters = self.get_input_value("parameters") mask = self.get_input_value("mask", None) key = self.get_input_value("key", None) x = self.get_input_value("x") y = self.get_input_value("y") stack = self.get_input_value("stack") save_path = self.get_input_value("save_path") if not save_path.endswith(".h5"): save_path = save_path + ".h5" stack_path = self.get_input_value("stack_path") scan_number = self.get_input_value("scan_number") if key is not None: # onefile.startswith("esrf:") redis_url = BeaconData().get_redis_data_db() datastore = DataStore(redis_url) scan = datastore.load_scan(key) title = scan.info["title"] prename = f"{title}_scan_number_{scan_number}" elif stack_path.endswith(".h5"): base = os.path.basename(stack_path) name = os.path.splitext(base)[0] prename = f"{name}_scan_number_{scan_number}" else: raise ValueError("Invalid stack path") self.group_names.append( self.generate_h5_file( os.path.basename(save_path), prename, os.path.dirname(save_path), self.get_input_value("calibration_x", None), self.get_input_value("calibration_y", None), x, y, stack, parameters[0], parameters[1], mask, config, stack_path, scan_number, key, ) ) self.outputs.finished = True self.outputs.group_names = self.group_names self.outputs.config = config
[docs] def generate_h5_file( self, h5name, savename, savedir, calibration_x, calibration_y, x, y, y_ini, labels, parameters, mask, config, stack_path, scan_number, key, ): """ Generate H5 file from results :param h5name: name of the h5 file :param savename: name of the result (group) :param savedir: directory of the result file :param calibration_x: list of coefficients for polynomial equation :param calibration_y: list of coefficients for polynomial equation :param x: x-data calibrated :param y: y-data calibrated :param y_ini: y-data without calibration :param labels: list of parameters names :param parameters: list of array of parameters :param mask: the applied mask :param config: configuration :param stack_path: file path :param scan_number: scan number :param key: bliss key of the scan """ NDY_, NDX_ = read.read_axes(stack_path, scan_number, key) h5_filepath = os.path.join(savedir, h5name) if os.path.exists(h5_filepath) is False: os.makedirs(os.path.dirname(h5_filepath), exist_ok=True) mode = "w" else: mode = "a" with h5py.File(h5_filepath, mode) as h5file: if savename not in h5file: grp = h5file.create_group(savename, track_order=True) new_savename = savename else: print( savename, " already exists and could not be saved. Numeric name of scan will be changed." " Should never appear during online fitting.", ) k = 2 new_savename = savename + "_repetition_1" while new_savename in h5file: new_savename = savename + "_repetition_" + str(k) k = k + 1 grp = h5file.create_group(new_savename, track_order=True) if calibration_x is not None: grp.create_dataset("Calibration_x", data=calibration_x) if calibration_y is not None: grp.create_dataset("Calibration_y", data=calibration_y) grp.attrs["NX_class"] = "NXentry" D2group = grp.create_group("2D_maps") D2group.attrs["default"] = "data_sum_stack_norm" for subgroup, subgroup_data in [ ("data_sum_stack", y_ini), ("data_sum_stack_norm", y), ]: Ngr = D2group.create_group(subgroup) Ngr.attrs["NX_class"] = "NXdata" ND = Ngr.create_dataset( subgroup, data=np.array(subgroup_data.sum(axis=-1), dtype=np.float64), ) NDY = Ngr.create_dataset("slow_motor", data=NDY_) NDX = Ngr.create_dataset("fast_motor", data=NDX_) NDY.make_scale("slow_motor") NDX.make_scale("fast_motor") ND.dims[1].attach_scale(NDX) ND.dims[0].attach_scale(NDY) ND.dims[1].label = "fast_motor" ND.dims[0].label = "slow_motor" Ngr.attrs["signal"] = subgroup Ngr.attrs["axes"] = ["slow_motor", "fast_motor"] if mask is not None: D2group.create_dataset("mask", data=mask) s_grp = D2group.create_group("fit", track_order=True) s_grp.attrs["NX_class"] = "NXdata" s_grp.attrs["signal"] = labels[2] if len(labels) > 2 else labels[0] for k in range(0, len(labels)): s_grp.create_dataset(labels[k], data=parameters[k]) D3group = grp.create_group("3D_maps") D3group.attrs["default"] = "data_norm" for subgroup, subgroup_data in [("data", y_ini), ("data_norm", y)]: Ngr = D3group.create_group(subgroup) Ngr.attrs["NX_class"] = "NXdata" ND = Ngr.create_dataset( subgroup, data=np.array(subgroup_data, dtype=np.float64) ) NDY = Ngr.create_dataset("slow_motor", data=NDY_) NDX = Ngr.create_dataset("fast_motor", data=NDX_) NDZ = Ngr.create_dataset("x", data=x) NDY.make_scale("slow_motor") NDX.make_scale("fast_motor") NDZ.make_scale("x") ND.dims[2].attach_scale(NDZ) ND.dims[1].attach_scale(NDX) ND.dims[0].attach_scale(NDY) ND.dims[2].label = "x" ND.dims[1].label = "fast_motor" ND.dims[0].label = "slow_motor" Ngr.attrs["signal"] = subgroup Ngr.attrs["axes"] = ["slow_motor", "fast_motor", "x"] config_group = grp.create_group("config") path_for_config = config_group.name grp["measurement"] = h5py.ExternalLink( stack_path, str(scan_number) + ".1/measurement" ) grp["instrument"] = h5py.ExternalLink( stack_path, str(scan_number) + ".1/instrument" ) dicttonx(config, h5_filepath, path_for_config, mode="a") return new_savename
[docs] class XeolStackFit( Task, input_names=["config", "x", "y", "mask"], output_names=["parameters"], ): """fit spectra(pixels) of stack within mask"""
[docs] def run(self): config = self.get_input_value("config") stack_fitter = StackSimpleFit() stack_fitter.setConfiguration(config) x = self.get_input_value("x") y = self.get_input_value("y") mask = self.get_input_value("mask") parameters = [] stack_fitter.setData(x, y) # either create RAM folder or do a monkey patch for "os.path.isdir" and "os.mkdir"... # due to the structure of PyMca StackSimpleFit methods - creating directory in the "fit" and not in the "save" method temp_dir = tempfile.mkdtemp("empty_dir_for_ewoks") stack_fitter.setOutputDirectory(temp_dir) stack_fitter.setOutputFileBaseName("tmp") stack_fitter.processStack(mask=mask) # remove empty (due to overwrite of StackSimpleFit.onProcessStackFinished) folder from RAM shutil.rmtree(temp_dir) # reading a dictionary for an output tmp_labels = [] tmp_parameters = [] for j in range(0, len(stack_fitter._parameters)): tmp_labels.append(stack_fitter._parameters[j]) tmp_labels.append("s(%s)" % stack_fitter._parameters[j]) tmp_parameters.append(stack_fitter._images[stack_fitter._parameters[j]]) tmp_parameters.append(stack_fitter._sigmas[stack_fitter._parameters[j]]) tmp_labels.append("chisq") tmp_parameters.append(stack_fitter._images["chisq"]) parameters = [tmp_labels, tmp_parameters] # if configuartion do not match the data - TypeError: object of type 'NoneType' has no len() self.outputs.parameters = parameters
if __name__ == "__main__": nodes = [ { "id": "read_stack", "task_type": "class", "task_identifier": "ewoksid16b.tasks.xeoltasks.ReadStack", }, { "id": "read_correct_config", "task_type": "class", "task_identifier": "ewoksid16b.tasks.xeoltasks.ReadCorrectConfig", }, { "id": "fit_xeol", "task_type": "class", "task_identifier": "ewoksid16b.tasks.xeoltasks.XeolStackFit", }, { "id": "save_xeol_h5", "task_type": "class", "task_identifier": "ewoksid16b.tasks.xeoltasks.SaveXeolH5", }, ] links = [ { "source": "read_stack", "target": "fit_xeol", "data_mapping": [ {"source_output": "x", "target_input": "x"}, {"source_output": "y", "target_input": "y"}, {"source_output": "mask", "target_input": "mask"}, ], }, { "source": "read_correct_config", "target": "fit_xeol", "data_mapping": [ {"source_output": "config", "target_input": "config"}, ], }, { "source": "read_correct_config", "target": "save_xeol_h5", "data_mapping": [ {"source_output": "config", "target_input": "config"}, ], }, { "source": "fit_xeol", "target": "save_xeol_h5", "data_mapping": [ {"source_output": "parameters", "target_input": "parameters"}, ], }, { "source": "read_stack", "target": "save_xeol_h5", "data_mapping": [ {"source_output": "x", "target_input": "x"}, {"source_output": "y", "target_input": "y"}, {"source_output": "stack", "target_input": "stack"}, {"source_output": "mask", "target_input": "mask"}, ], }, ] workflow = {"graph": {"id": "spc_xeol_test"}, "nodes": nodes, "links": links} current_dir = os.path.dirname(os.path.abspath(__file__)) workflow_path = os.path.join(current_dir, "..", "workflows/id16b_xeol.json") convert_graph(workflow, workflow_path)