import os
import shutil
import tempfile
import h5py
import hdf5plugin # noqa
import numpy as np
from blissdata.beacon.data import BeaconData
from blissdata.redis_engine.store import DataStore
from ewoks import convert_graph
from ewokscore import Task
from PyMca5.PyMcaIO import ConfigDict
from PyMca5.PyMcaMath.fitting.StackSimpleFit import StackSimpleFit as TmpClass
from silx.io.dictdump import dicttonx
from .utils import calibration
from .utils import read
# not to save EDF files
[docs]
class StackSimpleFit(TmpClass):
[docs]
def onProcessStackFinished(self):
pass
[docs]
class ReadStack(
Task,
input_names=["stack_path", "scan_number"],
optional_input_names=[
"threshold",
"calibration_x",
"calibration_y",
"norm_counter",
"counter",
"key",
],
output_names=["x", "stack", "y", "mask"],
):
"""Read the data. Calibrate, normalize, reshape the data for next steps"""
[docs]
def run(self):
x, stack, y, mask = self.define_x_y_stack(
self.get_input_value("stack_path"),
self.get_input_value("norm_counter", None),
self.get_input_value("calibration_x", None),
self.get_input_value("calibration_y", None),
self.get_input_value("counter", None),
self.get_input_value("scan_number"),
self.get_input_value("threshold", 0),
self.get_input_value("key", None),
)
self.outputs.x = x
self.outputs.stack = stack
self.outputs.y = y
self.outputs.mask = mask
[docs]
def define_x_y_stack(
self,
file,
scale_normalizer,
calibration_x,
calibration_y,
counter,
scan_number,
roi=0,
key=None,
):
"""
define full set of data
:param file: a single path to a file with stack. Also could be an url to the redis server.
:param scale_normalizer: short name of counter for normalization
:param calibration_x: x-axis calibration
:param calibration_y: y-axis calibration
:param counter: main counter
:param scan_number: scan number of the map
:param roi: unique or a list of threshold
:return: calibrated x, stacks(initial data), calibrated y, mask coordinates
"""
x, y, stack, pixels = read.read_stack(file, counter, scan_number, key)
if scale_normalizer is not None and scale_normalizer is not False:
y = read.read_y_normalization_counter(
file, stack, scale_normalizer, scan_number, pixels, key
)
if calibration_x is not None:
x = calibration.apply_calibration(x, calibration_x)
if calibration_y is not None:
y = calibration.apply_calibration(y, calibration_y)
x = np.array(x)
# accept all pixel above threshold
spc = y.sum(axis=-1)
res_roi = np.where(spc >= roi)
# convert mask positioners into an actual mask
mask = np.zeros(y.shape[0:2], np.uint8)
mask[res_roi[0], res_roi[1]] = 1
if (mask == 0).all(): # to avoid fully masked map - as it will not be fitted
print("everything is masked, one point will be unmasked")
spc = y.sum(axis=-1)
max_spc = np.where(spc == np.max(spc))
mask[max_spc[0], max_spc[1]] = 1
return x, stack, y, mask
[docs]
class ReadCorrectConfig(
Task,
input_names=["config_path"],
output_names=["config"],
):
"""read configuration file and create a dictionary and a mask"""
[docs]
def run(self):
cfg_file = self.get_input_value("config_path")
current_configuration = ConfigDict.ConfigDict()
current_configuration.read(cfg_file)
# sometimes background is saved not as a list because it is single constnat.
back_ground = current_configuration["functions"]["User Estimated Constant"][
"configuration"
]["estimation"]["parameters"]
if not isinstance(back_ground, list):
print("back_ground must be a list, correction was applied")
current_configuration["functions"]["User Estimated Constant"][
"configuration"
]["estimation"]["parameters"] = [back_ground]
# PyMca sometimes fail to read functions from configuration then it reads it from the file
# but file path is absolute - thus, if cfg was saved on different computer the process crushes.
stack_fitter = StackSimpleFit()
try:
stack_fitter.setConfiguration(current_configuration)
except (RuntimeError, FileNotFoundError, ModuleNotFoundError, ValueError):
print(
"PyMca is trying to reach path which do not exist. It will be redirected to the installed library"
)
import PyMca5.PyMcaMath.fitting.SimpleFitUserEstimatedFunctions as dummy
# to change things like config["functions"]["User Estimated Constant"]["file"]
for key, subdict in current_configuration.get("functions", {}).items():
if isinstance(subdict, dict) and "file" in subdict:
subdict["file"] = dummy.__file__
self.outputs.config = current_configuration
[docs]
class SaveXeolH5(
Task,
input_names=[
"save_path",
"stack_path",
"x",
"y",
"stack",
"parameters",
"config",
"scan_number",
],
optional_input_names=["calibration_x", "calibration_y", "mask", "key"],
output_names=["finished", "group_names", "config"],
):
"""save results to external h5 file"""
[docs]
def run(self):
self.group_names = []
config = self.get_input_value("config")
parameters = self.get_input_value("parameters")
mask = self.get_input_value("mask", None)
key = self.get_input_value("key", None)
x = self.get_input_value("x")
y = self.get_input_value("y")
stack = self.get_input_value("stack")
save_path = self.get_input_value("save_path")
if not save_path.endswith(".h5"):
save_path = save_path + ".h5"
stack_path = self.get_input_value("stack_path")
scan_number = self.get_input_value("scan_number")
if key is not None: # onefile.startswith("esrf:")
redis_url = BeaconData().get_redis_data_db()
datastore = DataStore(redis_url)
scan = datastore.load_scan(key)
title = scan.info["title"]
prename = f"{title}_scan_number_{scan_number}"
elif stack_path.endswith(".h5"):
base = os.path.basename(stack_path)
name = os.path.splitext(base)[0]
prename = f"{name}_scan_number_{scan_number}"
else:
raise ValueError("Invalid stack path")
self.group_names.append(
self.generate_h5_file(
os.path.basename(save_path),
prename,
os.path.dirname(save_path),
self.get_input_value("calibration_x", None),
self.get_input_value("calibration_y", None),
x,
y,
stack,
parameters[0],
parameters[1],
mask,
config,
stack_path,
scan_number,
key,
)
)
self.outputs.finished = True
self.outputs.group_names = self.group_names
self.outputs.config = config
[docs]
def generate_h5_file(
self,
h5name,
savename,
savedir,
calibration_x,
calibration_y,
x,
y,
y_ini,
labels,
parameters,
mask,
config,
stack_path,
scan_number,
key,
):
"""
Generate H5 file from results
:param h5name: name of the h5 file
:param savename: name of the result (group)
:param savedir: directory of the result file
:param calibration_x: list of coefficients for polynomial equation
:param calibration_y: list of coefficients for polynomial equation
:param x: x-data calibrated
:param y: y-data calibrated
:param y_ini: y-data without calibration
:param labels: list of parameters names
:param parameters: list of array of parameters
:param mask: the applied mask
:param config: configuration
:param stack_path: file path
:param scan_number: scan number
:param key: bliss key of the scan
"""
NDY_, NDX_ = read.read_axes(stack_path, scan_number, key)
h5_filepath = os.path.join(savedir, h5name)
if os.path.exists(h5_filepath) is False:
os.makedirs(os.path.dirname(h5_filepath), exist_ok=True)
mode = "w"
else:
mode = "a"
with h5py.File(h5_filepath, mode) as h5file:
if savename not in h5file:
grp = h5file.create_group(savename, track_order=True)
new_savename = savename
else:
print(
savename,
" already exists and could not be saved. Numeric name of scan will be changed."
" Should never appear during online fitting.",
)
k = 2
new_savename = savename + "_repetition_1"
while new_savename in h5file:
new_savename = savename + "_repetition_" + str(k)
k = k + 1
grp = h5file.create_group(new_savename, track_order=True)
if calibration_x is not None:
grp.create_dataset("Calibration_x", data=calibration_x)
if calibration_y is not None:
grp.create_dataset("Calibration_y", data=calibration_y)
grp.attrs["NX_class"] = "NXentry"
D2group = grp.create_group("2D_maps")
D2group.attrs["default"] = "data_sum_stack_norm"
for subgroup, subgroup_data in [
("data_sum_stack", y_ini),
("data_sum_stack_norm", y),
]:
Ngr = D2group.create_group(subgroup)
Ngr.attrs["NX_class"] = "NXdata"
ND = Ngr.create_dataset(
subgroup,
data=np.array(subgroup_data.sum(axis=-1), dtype=np.float64),
)
NDY = Ngr.create_dataset("slow_motor", data=NDY_)
NDX = Ngr.create_dataset("fast_motor", data=NDX_)
NDY.make_scale("slow_motor")
NDX.make_scale("fast_motor")
ND.dims[1].attach_scale(NDX)
ND.dims[0].attach_scale(NDY)
ND.dims[1].label = "fast_motor"
ND.dims[0].label = "slow_motor"
Ngr.attrs["signal"] = subgroup
Ngr.attrs["axes"] = ["slow_motor", "fast_motor"]
if mask is not None:
D2group.create_dataset("mask", data=mask)
s_grp = D2group.create_group("fit", track_order=True)
s_grp.attrs["NX_class"] = "NXdata"
s_grp.attrs["signal"] = labels[2] if len(labels) > 2 else labels[0]
for k in range(0, len(labels)):
s_grp.create_dataset(labels[k], data=parameters[k])
D3group = grp.create_group("3D_maps")
D3group.attrs["default"] = "data_norm"
for subgroup, subgroup_data in [("data", y_ini), ("data_norm", y)]:
Ngr = D3group.create_group(subgroup)
Ngr.attrs["NX_class"] = "NXdata"
ND = Ngr.create_dataset(
subgroup, data=np.array(subgroup_data, dtype=np.float64)
)
NDY = Ngr.create_dataset("slow_motor", data=NDY_)
NDX = Ngr.create_dataset("fast_motor", data=NDX_)
NDZ = Ngr.create_dataset("x", data=x)
NDY.make_scale("slow_motor")
NDX.make_scale("fast_motor")
NDZ.make_scale("x")
ND.dims[2].attach_scale(NDZ)
ND.dims[1].attach_scale(NDX)
ND.dims[0].attach_scale(NDY)
ND.dims[2].label = "x"
ND.dims[1].label = "fast_motor"
ND.dims[0].label = "slow_motor"
Ngr.attrs["signal"] = subgroup
Ngr.attrs["axes"] = ["slow_motor", "fast_motor", "x"]
config_group = grp.create_group("config")
path_for_config = config_group.name
grp["measurement"] = h5py.ExternalLink(
stack_path, str(scan_number) + ".1/measurement"
)
grp["instrument"] = h5py.ExternalLink(
stack_path, str(scan_number) + ".1/instrument"
)
dicttonx(config, h5_filepath, path_for_config, mode="a")
return new_savename
[docs]
class XeolStackFit(
Task,
input_names=["config", "x", "y", "mask"],
output_names=["parameters"],
):
"""fit spectra(pixels) of stack within mask"""
[docs]
def run(self):
config = self.get_input_value("config")
stack_fitter = StackSimpleFit()
stack_fitter.setConfiguration(config)
x = self.get_input_value("x")
y = self.get_input_value("y")
mask = self.get_input_value("mask")
parameters = []
stack_fitter.setData(x, y)
# either create RAM folder or do a monkey patch for "os.path.isdir" and "os.mkdir"...
# due to the structure of PyMca StackSimpleFit methods - creating directory in the "fit" and not in the "save" method
temp_dir = tempfile.mkdtemp("empty_dir_for_ewoks")
stack_fitter.setOutputDirectory(temp_dir)
stack_fitter.setOutputFileBaseName("tmp")
stack_fitter.processStack(mask=mask)
# remove empty (due to overwrite of StackSimpleFit.onProcessStackFinished) folder from RAM
shutil.rmtree(temp_dir)
# reading a dictionary for an output
tmp_labels = []
tmp_parameters = []
for j in range(0, len(stack_fitter._parameters)):
tmp_labels.append(stack_fitter._parameters[j])
tmp_labels.append("s(%s)" % stack_fitter._parameters[j])
tmp_parameters.append(stack_fitter._images[stack_fitter._parameters[j]])
tmp_parameters.append(stack_fitter._sigmas[stack_fitter._parameters[j]])
tmp_labels.append("chisq")
tmp_parameters.append(stack_fitter._images["chisq"])
parameters = [tmp_labels, tmp_parameters]
# if configuartion do not match the data - TypeError: object of type 'NoneType' has no len()
self.outputs.parameters = parameters
if __name__ == "__main__":
nodes = [
{
"id": "read_stack",
"task_type": "class",
"task_identifier": "ewoksid16b.tasks.xeoltasks.ReadStack",
},
{
"id": "read_correct_config",
"task_type": "class",
"task_identifier": "ewoksid16b.tasks.xeoltasks.ReadCorrectConfig",
},
{
"id": "fit_xeol",
"task_type": "class",
"task_identifier": "ewoksid16b.tasks.xeoltasks.XeolStackFit",
},
{
"id": "save_xeol_h5",
"task_type": "class",
"task_identifier": "ewoksid16b.tasks.xeoltasks.SaveXeolH5",
},
]
links = [
{
"source": "read_stack",
"target": "fit_xeol",
"data_mapping": [
{"source_output": "x", "target_input": "x"},
{"source_output": "y", "target_input": "y"},
{"source_output": "mask", "target_input": "mask"},
],
},
{
"source": "read_correct_config",
"target": "fit_xeol",
"data_mapping": [
{"source_output": "config", "target_input": "config"},
],
},
{
"source": "read_correct_config",
"target": "save_xeol_h5",
"data_mapping": [
{"source_output": "config", "target_input": "config"},
],
},
{
"source": "fit_xeol",
"target": "save_xeol_h5",
"data_mapping": [
{"source_output": "parameters", "target_input": "parameters"},
],
},
{
"source": "read_stack",
"target": "save_xeol_h5",
"data_mapping": [
{"source_output": "x", "target_input": "x"},
{"source_output": "y", "target_input": "y"},
{"source_output": "stack", "target_input": "stack"},
{"source_output": "mask", "target_input": "mask"},
],
},
]
workflow = {"graph": {"id": "spc_xeol_test"}, "nodes": nodes, "links": links}
current_dir = os.path.dirname(os.path.abspath(__file__))
workflow_path = os.path.join(current_dir, "..", "workflows/id16b_xeol.json")
convert_graph(workflow, workflow_path)