"""
Iterative Correction Scheme.
The response matrices :math:`R_{O}` for the observables :math:`O` (e.g. BBX, MUX, ...)
are loaded from a file and then the equation
.. math:: R_{O} \cdot \delta var = O_{meas} - O_{model}
:label: eq1
is being solved for :math:`\delta var` via a chosen method (at the moment only numpys pinv,
which creates a pseudo-inverse via svd is used).
The response matrices are hereby merged into one matrix for all observables to solve vor all
:math:`\delta var` at the same time.
To normalize the observables to another ``weigths`` (W) can be applied.
Furthermore, an ``errorcut``, specifying the maximum errorbar for a BPM to be used, and
``modelcut``, specifying the maximum distance between measurement and model for a BPM to be used,
can be defined. Data from BPMs outside of those cut-values will be discarded.
These cuts are defined for each observable separately.
After each iteration the model variables are changed by :math:`-\delta var` and the
observables are recalculated by Mad-X.
:eq:`eq1` is then solved again.
:author: Lukas Malina, Joschua Dilly
Possible problems and notes:
* do we need error cut, when we use error-based weights? probably not anymore
* error-based weights default? likely - but be carefull with low tune errors vs
svd cut in pseudoinverse
* manual creation of pd.DataFrame varslist, deltas? maybe
tunes in tfs_pandas single value or a column?
* Minimal strength removed
* Check the output files and when they are written
* There should be some summation/renaming for iterations
* For two beam correction
* The two beams can be treated separately until the calcultation of correction
* Values missing in the response (i.e. correctors of the other beam) shall be
treated as zeros
* Missing a part that treats the output from LSA
"""
import cPickle
import datetime
import os
import pickle
import time
import numpy as np
import pandas as pd
from sklearn.linear_model import OrthogonalMatchingPursuit
import madx_wrapper
from correction.fullresponse import response_twiss
from model import manager
from optics_measurements.io_filehandler import OpticsMeasurement
from twiss_optics.optics_class import TwissOptics
from utils import logging_tools
from utils import iotools
from tfs_files import tfs_pandas as tfs
from utils.entrypoint import entrypoint, EntryPointParameters
from utils.logging_tools import log_pandas_settings_with_copy
LOG = logging_tools.get_logger(__name__)
DEV_NULL = os.devnull
# Configuration ##################################################################
DEFAULT_ARGS = {
"optics_file": None,
"output_path": None,
"output_filename": "changeparameters_iter",
"svd_cut": 0.01,
"optics_params": ['MUX', 'MUY', 'BBX', 'BBY', 'NDX', 'Q'],
"variables": ["MQM", "MQT", "MQTL", "MQY"],
"beta_file_name": "getbeta",
"method": "pinv",
"max_iter": 3,
"eps": None,
}
# Define functions here, to new optics params
def _get_default_values():
return {
'modelcut': {
'MUX': 0.05, 'MUY': 0.05,
'BBX': 0.2, 'BBY': 0.2,
'BETX': 0.2, 'BETY': 0.2,
'DX': 0.2, 'DY': 0.2,
'NDX': 0.2, 'Q': 0.1,
'F1001R': 0.0, 'F1001I': 0.0,
'F1010R': 0.0, 'F1010I': 0.0,
},
'errorcut': {
'MUX': 0.035, 'MUY': 0.035,
'BBX': 0.02, 'BBY': 0.02,
'BETX': 0.02, 'BETY': 0.02,
'DX': 0.02, 'DY': 0.02,
'NDX': 0.02, 'Q': 0.027,
'F1001R': 0.02, 'F1001I': 0.02,
'F1010R': 0.02, 'F1010I': 0.02,
},
'weights': {
'MUX': 1, 'MUY': 1,
'BBX': 0, 'BBY': 0,
'BETX': 0, 'BETY': 0,
'DX': 0, 'DY': 0,
'NDX': 0, 'Q': 10,
'F1001R': 0, 'F1001I': 0,
'F1010R': 0, 'F1010I': 0,
},
}
def _get_measurement_filters():
return {
'MUX': _get_filtered_phases, 'MUY': _get_filtered_phases,
'BBX': _get_filtered_betabeat, 'BBY': _get_filtered_betabeat,
'BETX': _get_filtered_generic, 'BETY': _get_filtered_generic,
'DX': _get_filtered_generic, 'DY': _get_filtered_generic,
'NDX': _get_filtered_generic, 'Q': _get_tunes,
'F1001R': _get_filtered_generic, 'F1001I': _get_filtered_generic,
'F1010R': _get_filtered_generic, 'F1010I': _get_filtered_generic,
}
def _get_response_filters():
return {
'MUX': _get_phase_response, 'MUY': _get_phase_response,
'BBX': _get_generic_response, 'BBY': _get_generic_response,
'BETX': _get_generic_response, 'BETY': _get_generic_response,
'DX': _get_generic_response, 'DY': _get_generic_response,
'NDX': _get_generic_response, 'Q': _get_tune_response,
'F1001R': _get_generic_response, 'F1001I': _get_generic_response,
'F1010R': _get_generic_response, 'F1010I': _get_generic_response,
}
def _get_model_appenders():
return {
'MUX': _get_model_phases, 'MUY': _get_model_phases,
'BBX': _get_model_betabeat, 'BBY': _get_model_betabeat,
'BETX': _get_model_generic, 'BETY': _get_model_generic,
'DX': _get_model_generic, 'DY': _get_model_generic,
'NDX': _get_model_norm_disp, 'Q': _get_model_tunes,
'F1001R': _get_model_generic, 'F1001I': _get_model_generic,
'F1010R': _get_model_generic, 'F1010I': _get_model_generic,
}
def _get_params():
params = EntryPointParameters()
params.add_parameter(
flags="--meas_dir",
help="Path to the directory containing the measurement files.",
name="meas_dir",
required=True,
)
params.add_parameter(
flags="--model_dir",
help="Path to the model to use.",
name="model_dir",
required=True,
)
params.add_parameter(
flags="--fullresponse",
help=("Path to the fullresponse binary file."
" If not given, calculates the response analytically."),
name="fullresponse_path",
)
params.add_parameter(
flags="--update_response",
help="If True, it will update the (analytical) response per iteration.",
name="update_response",
action="store_true",
)
params.add_parameter(
flags="--optics_params",
help="List of parameters to correct upon (e.g. BBX BBY)",
name="optics_params",
type=str,
nargs="+",
default=DEFAULT_ARGS["optics_params"],
)
params.add_parameter(
flags="--optics_file",
help=("Path to the optics file to use. If not present will default to "
"model_path/modifiers.madx, if such a file exists."),
name="optics_file",
)
params.add_parameter(
flags="--output_dir",
help=("Path to the directory where to write the output files, will "
"default to the --meas input path."),
name="output_path",
default=DEFAULT_ARGS["output_path"],
)
params.add_parameter(
flags="--output_filename",
help="Identifier of the output files.",
name="output_filename",
default=DEFAULT_ARGS["output_filename"],
)
params.add_parameter(
flags="--svd_cut",
help=("Cutoff for small singular values of the pseudo inverse. (Method: 'pinv')"
"Singular values smaller than rcond*largest_singular_value are set to zero"),
name="svd_cut",
type=float,
default=DEFAULT_ARGS["svd_cut"],
)
params.add_parameter(
flags="--n_correctors",
help=("Maximum number of correctors to use. (Method: 'omp')"),
name="n_correctors",
type=int,
)
params.add_parameter(
flags="--min_corrector_strength",
help=("Minimum (absolute) strength of correctors."),
name="min_corrector_strength",
type=float,
default=0.,
)
params.add_parameter(
flags="--model_cut",
help=("Reject BPMs whose deviation to the model is higher than the "
"correspoding input. Input in order of optics_params."),
name="modelcut",
nargs="+",
type=float,
)
params.add_parameter(
flags="--error_cut",
help=("Reject BPMs whose error bar is higher than the "
"corresponding input. Input in order of optics_params."),
name="errorcut",
nargs="+",
type=float,
)
params.add_parameter(
flags="--weights",
help=("Weight to apply to each measured quantity. "
"Input in order of optics_params."),
name="weights",
nargs="+",
type=float,
)
params.add_parameter(
flags="--use_errorbars",
help=("If True, it will take into account the measured errorbars "
"in the correction."),
name="use_errorbars",
action="store_true",
)
params.add_parameter(
flags="--variables",
help="List of names of the variables classes to use.",
name="variable_categories",
nargs="+",
default=DEFAULT_ARGS["variables"],
)
params.add_parameter(
flags="--beta_file_name",
help="Prefix of the beta file to use. E.g.: getkmodbeta",
name="beta_file_name",
default=DEFAULT_ARGS["beta_file_name"],
)
params.add_parameter(
flags="--virt_flag",
help="If true, it will use virtual correctors.",
name="virt_flag",
action="store_true",
)
params.add_parameter(
flags="--method",
help="Optimization method to use.",
name="method",
type=str,
default=DEFAULT_ARGS["method"],
choices=["pinv", "omp"]
)
params.add_parameter(
flags="--max_iter",
help=("Maximum number of correction re-iterations to perform."
"A value of `0` means the correction is calculated once (like in the old days)."),
name="max_iter",
type=int,
default=DEFAULT_ARGS["max_iter"],
)
params.add_parameter(
flags="--eps",
help=("Convergence criterion."
"If <|delta(PARAM * WEIGHT)|> < eps, stop iteration.(Not implemented yet)"),
name="eps",
type=float,
default=DEFAULT_ARGS["eps"],
)
params.add_parameter(
flags="--debug",
help="Print debug information.",
name="debug",
action="store_true",
)
return params
# Entry Point ##################################################################
[docs]@entrypoint(_get_params())
def global_correction(opt, accel_opt):
""" Do the global correction. Iteratively.
Keyword Args:
Required
meas_dir: Path to the directory containing the measurement files.
**Flags**: --meas_dir
model_dir: Path to the dir containing the model (twiss.dat or twiss_elements.dat) to use.
**Flags**: --model_dir
Optional
beta_file_name: Prefix of the beta file to use. E.g.: getkmodbeta
**Flags**: --beta_file_name
**Default**: ``getbeta``
debug: Print debug information.
**Flags**: --debug
**Action**: ``store_true``
eps (float): (Not implemented yet) Convergence criterion.
If :math:`<|\Delta(PARAM \cdot WEIGHT)|> < \epsilon`, stop iteration.
**Flags**: --eps
**Default**: ``None``
errorcut (float): Reject BPMs whose error bar is higher than the corresponding input.
Input in order of optics_params.
**Flags**: --error_cut
fullresponse_path: Path to the fullresponse binary file.
If not given, calculates the response analytically.
**Flags**: --fullresponse
max_iter (int): Maximum number of correction re-iterations to perform.
A value of `0` means the correction is calculated once
(like in the old days).
**Flags**: --max_iter
**Default**: ``3``
method (str): Optimization method to use.
**Flags**: --method
**Choices**: ['pinv', 'omp']
**Default**: ``pinv``
min_corrector_strength (float): Minimum (absolute) strength of correctors.
**Flags**: --min_corrector_strength
**Default**: ``0.``
modelcut (float): Reject BPMs whose deviation to the model is higher than the
correspoding input. Input in order of optics_params.
**Flags**: --model_cut
n_correctors (int): Maximum number of correctors to use. (Method: 'omp')
**Flags**: --n_correctors
optics_file: Path to the optics file to use, usually modifiers.madx.
If not present will default to model_path/modifiers.madx
**Flags**: --optics_file
optics_params (str): List of parameters to correct upon (e.g. BBX BBY)
**Flags**: --optics_params
**Default**: ``['MUX', 'MUY', 'BBX', 'BBY', 'NDX', 'Q']``
output_path: Path to the directory where to write the output files,
will default to the --meas input path.
**Flags**: --output_dir
**Default**: ``None``
svd_cut (float): Cutoff for small singular values of the pseudo inverse. (Method: 'pinv')
Singular values smaller than
:math:`rcond \cdot largest_singular_value` are set to zero
**Flags**: --svd_cut
**Default**: ``0.01``
use_errorbars: If True, it will take into account the measured errorbars in the correction.
**Flags**: --use_errorbars
**Action**: ``store_true``
variable_categories: List of names of the variables classes to use.
**Flags**: --variables
**Default**: ``['MQM', 'MQT', 'MQTL', 'MQY']``
virt_flag: If true, it will use virtual correctors.
**Flags**: --virt_flag
**Action**: ``store_true``
weights (float): Weights to apply to each measured quantity. Input in order of optics_params.
**Flags**: --weights
"""
LOG.info("Starting Iterative Global Correction.")
with logging_tools.DebugMode(active=opt.debug,
log_file=os.path.join(opt.model_dir, "iterative_correction.log")):
not_implemented_params = [k for k in opt.optics_params
if k not in _get_measurement_filters()]
if any(not_implemented_params):
raise NotImplementedError("Correct iterative is not equipped for parameters:"
"'{:s}'".format(not_implemented_params))
# ######### Preparations ######### #
# check on opt
opt = _check_opt(opt)
meth_opt = _get_method_opt(opt)
# get accelerator class
accel_cls = manager.get_accel_class(accel_opt)
accel_inst = accel_cls(model_dir=opt.model_dir)
if opt.optics_file is not None:
accel_inst.optics_file = opt.optics_file
# convert numbers to dictionaries
w_dict = dict(zip(opt.optics_params, opt.weights))
mcut_dict = dict(zip(opt.optics_params, opt.modelcut))
ecut_dict = dict(zip(opt.optics_params, opt.errorcut))
# read data from files
vars_list = _get_varlist(accel_cls, opt.variable_categories, opt.virt_flag)
optics_params, meas_dict = _get_measurment_data(
opt.optics_params,
opt.meas_dir, opt.beta_file_name,
w_dict,
)
mcut_dict = _automate_modelcut(mcut_dict, meas_dict, opt.variable_categories)
if opt.fullresponse_path is not None:
resp_dict = _load_fullresponse(opt.fullresponse_path, vars_list)
else:
resp_dict = response_twiss.create_response(
accel_inst, opt.variable_categories, optics_params
)
# the model in accel_inst is modified later, so save nominal model here to variables
nominal_model = _maybe_add_coupling_to_model(accel_inst.get_model_tfs(), optics_params)
# apply filters to data
meas_dict = _filter_measurement(
optics_params, meas_dict, nominal_model,
opt.use_errorbars, w_dict, ecut_dict, mcut_dict
)
meas_dict = _append_model_to_measurement(nominal_model, meas_dict, optics_params)
resp_dict = _filter_response_index(resp_dict, meas_dict, optics_params)
resp_matrix = _join_responses(resp_dict, optics_params, vars_list)
# _dump(os.path.join(opt.output_path, "measurement_dict.bin"), meas_dict)
delta = tfs.TfsDataFrame(0, index=vars_list, columns=["DELTA"])
# ######### Iteration Phase ######### #
for iteration in range(opt.max_iter + 1):
LOG.info("Correction Iteration {:d} of {:d}.".format(iteration, opt.max_iter))
# ######### Update Model and Response ######### #
if iteration > 0:
LOG.debug("Updating model via MADX.")
corr_model_path = os.path.join(opt.output_path, "twiss_" + str(iteration) + ".dat")
_create_corrected_model(corr_model_path, opt.change_params_path,
accel_inst, opt.debug)
corr_model_elements = tfs.read_tfs(corr_model_path, index="NAME")
corr_model_elements = _maybe_add_coupling_to_model(
corr_model_elements, optics_params
)
bpms_index_mask = accel_inst.get_element_types_mask(
corr_model_elements.index, types=["bpm"]
)
corr_model = corr_model_elements.loc[bpms_index_mask, :]
meas_dict = _append_model_to_measurement(corr_model, meas_dict, optics_params)
if opt.update_response:
LOG.debug("Updating response.")
# please look away for the next two lines.
accel_inst._model = corr_model
accel_inst._elements = corr_model_elements
resp_dict = response_twiss.create_response(
accel_inst, opt.variable_categories, optics_params
)
resp_dict = _filter_response_index(resp_dict, meas_dict, optics_params)
resp_matrix = _join_responses(resp_dict, optics_params, vars_list)
# ######### Actual optimization ######### #
delta += _calculate_delta(
resp_matrix, meas_dict, optics_params, vars_list, opt.method, meth_opt)
delta, resp_matrix, vars_list = _filter_by_strength(delta, resp_matrix,
opt.min_corrector_strength)
# remove unused correctors from vars_list
writeparams(opt.change_params_path, delta)
writeparams(opt.change_params_correct_path, -delta)
LOG.debug("Cumulative delta: {:.5e}".format(
np.sum(np.abs(delta.loc[:, "DELTA"].values))))
write_knob(opt.knob_path, delta)
LOG.info("Finished Iterative Global Correction.")
# Main function helpers #######################################################
def _check_opt(opt):
""" Check on options and put in missing values """
# get unset paths from other paths
if opt.output_path is None:
opt.output_path = opt.meas_dir
iotools.create_dirs(opt.output_path)
# some paths "hardcoded"
opt.change_params_path = os.path.join(opt.output_path,
"{:s}.madx".format(opt.output_filename))
opt.change_params_correct_path = os.path.join(opt.output_path,
"{:s}_correct.madx".format(opt.output_filename))
opt.knob_path = os.path.join(opt.output_path, "{:s}.tfs").format(opt.output_filename)
# check cuts and weights:
def_dict = _get_default_values()
if opt.modelcut is None:
opt.modelcut = [def_dict["modelcut"][p] for p in opt.optics_params]
elif len(opt.optics_params) != len(opt.modelcut):
raise ValueError("The length of modelcut is not the same as of the optical parameters!")
if opt.errorcut is None:
opt.errorcut = [def_dict["errorcut"][p] for p in opt.optics_params]
elif len(opt.optics_params) != len(opt.errorcut):
raise ValueError("The length of errorcut is not the same as of the optical parameters!")
if opt.weights is None:
opt.weights = [def_dict["weights"][p] for p in opt.optics_params]
elif len(opt.optics_params) != len(opt.weights):
raise ValueError("The length of the weights is not the same as of the optical parameters!")
return opt
def _get_method_opt(opt):
""" Slightly unnecessary function to separate method-options
for easier debugging and readability """
return opt.get_subdict(["svd_cut", "n_correctors"])
def _print_rms(meas, diff_w, r_delta_w):
""" Prints current RMS status """
f_str = "{:>20s} : {:.5e}"
LOG.debug("RMS Measure - Model (before correction, w/o weigths):")
for key in meas:
LOG.debug(f_str.format(key, _rms(meas[key].loc[:, 'DIFF'].values)))
LOG.info("RMS Measure - Model (before correction, w/ weigths):")
for key in meas:
LOG.info(f_str.format(
key, _rms(meas[key].loc[:, 'DIFF'].values * meas[key].loc[:, 'WEIGHT'].values)))
LOG.info(f_str.format("All", _rms(diff_w)))
LOG.debug(f_str.format("R * delta", _rms(r_delta_w)))
LOG.debug("(Measure - Model) - (R * delta) ")
LOG.debug(f_str.format("", _rms(diff_w - r_delta_w)))
def _load_fullresponse(full_response_path, variables):
"""
Full response is dictionary of optics-parameter gradients upon
a change of a single quadrupole strength
"""
LOG.debug("Starting loading Full Response optics")
with open(full_response_path, "r") as full_response_file:
full_response_data = pickle.load(full_response_file)
loaded_vars = []
[loaded_vars.append(var) for resp in full_response_data.values() for var in resp]
if not any([v in loaded_vars for v in variables]):
raise ValueError("None of the given variables found in response matrix. "
"Are you using the right categories?")
LOG.debug("Loading ended")
return full_response_data
def _get_measurment_data(keys, meas_dir, beta_file_name, w_dict):
""" Retruns a dictionary full of get_llm data """
measurement = {}
filtered_keys = [k for k in keys if w_dict[k] != 0]
getllm_data = OpticsMeasurement(meas_dir)
for key in filtered_keys:
if key == "MUX":
measurement['MUX'] = getllm_data.phase_x
elif key == 'MUY':
measurement['MUY'] = getllm_data.phase_y
elif key == "DX":
measurement['DX'] = getllm_data.disp_x
elif key == "DY":
measurement['DY'] = getllm_data.disp_y
elif key == "NDX":
measurement['NDX'] = getllm_data.norm_disp
elif key in ('F1001R', 'F1001I', 'F1010R', 'F1010I'):
measurement[key] = getllm_data.coupling
elif key == "Q":
measurement["Q"] = pd.DataFrame({
# Just fractional tunes:
'VALUE': np.remainder([getllm_data.phase_x['Q1'],
getllm_data.phase_y['Q2']], [1, 1]),
# TODO measured errors not in the file
'ERROR': np.array([0.001, 0.001])
}, index=['Q1', 'Q2'])
else:
# a beta key
if beta_file_name == "getbeta":
if key in ("BBX", "BETX"):
measurement[key] = getllm_data.beta_x
elif key in ("BBY", "BETY"):
measurement[key] = getllm_data.beta_y
elif beta_file_name == "getampbeta":
if key in ("BBX", "BETX"):
measurement[key] = getllm_data.amp_beta_x
elif key in ("BBY", "BETY"):
measurement[key] = getllm_data.amp_beta_y
elif beta_file_name == "getkmodbeta":
if key in ("BBX", "BETX"):
measurement[key] = getllm_data.kmod_beta_x
elif key in ("BBY", "BETY"):
measurement[key] = getllm_data.kmod_beta_y
return filtered_keys, measurement
def _automate_modelcut(mcut_dict, meas_dict, vars_categories):
""" Automatic calculation of model-cut
For coupling: Applied if "coupling_knobs" is int the list of variables
AND if the model-cut is set to zero!
"""
if "coupling_knobs" in vars_categories:
# use the value after 5% of the sorted data as cut value
for rdt in ["1001", "1010"]:
rdt_names = ["F{:s}{:s}".format(rdt, comp) for comp in ["R", "I"]]
if any([name in meas_dict for name in rdt_names]):
# use meas_dict for checking, because it's already filtered
try:
meas = meas_dict[rdt_names[0]]
except KeyError:
# does not matter which one they link to the same file
meas = meas_dict[rdt_names[1]]
amp_meas = meas["F{:s}W".format(rdt)]
amp_mdl = np.sqrt(meas["MDLF{:s}R".format(rdt)]**2 +
meas["MDLF{:s}I".format(rdt)]**2
)
idx_num = int(np.floor(len(amp_meas) * 0.95))
idx = amp_meas.sort_values().index.values[idx_num]
new_cut = np.abs(amp_meas[idx] - amp_mdl[idx])
for name in rdt_names:
if mcut_dict[name] == 0.0:
mcut_dict[name] = new_cut
LOG.info("Model Cut for {:s} set to {:e}.".format(name, new_cut))
return mcut_dict
def _get_varlist(accel_cls, variables, virt_flag): # TODO: Virtual?
varlist = np.array(accel_cls.get_variables(classes=variables))
if len(varlist) == 0:
raise ValueError("No variables found! Make sure your categories are valid!")
return varlist
def _maybe_add_coupling_to_model(model, keys):
if any([key for key in keys if key.startswith("F1")]):
tw_opt = TwissOptics(model)
couple = tw_opt.get_coupling(method="cmatrix")
model["F1001R"] = couple["F1001"].apply(np.real).astype(np.float64)
model["F1001I"] = couple["F1001"].apply(np.imag).astype(np.float64)
model["F1010R"] = couple["F1010"].apply(np.real).astype(np.float64)
model["F1010I"] = couple["F1010"].apply(np.imag).astype(np.float64)
return model
# Parameter filtering ########################################################
def _filter_measurement(keys, meas, model, errorbar, w_dict, e_dict, m_dict):
""" Filters measurements and renames columns to VALUE, ERROR, WEIGHT"""
filters = _get_measurement_filters()
new = dict.fromkeys(keys)
for key in keys:
new[key] = filters[key](key, meas[key], model, errorbar, w_dict[key],
modelcut=m_dict[key], errorcut=e_dict[key])
return new
def _get_filtered_generic(key, meas, model, erwg, weight, modelcut, errorcut):
common_bpms = meas.index.intersection(model.index)
meas = meas.loc[common_bpms, :]
# value
new = tfs.TfsDataFrame(index=common_bpms)
new.loc[:, "VALUE"] = meas[key]
# errors
if ("ERR" + key) in meas.columns.values: # usually beta
if ('STD' + key) in meas.columns.values: # Old files or k-mod
new['ERROR'] = np.sqrt(np.square(meas['ERR' + key].values) +
np.square(meas['STD' + key].values))
else:
new['ERROR'] = meas['ERR' + key]
else:
key2num = {'1001': '1', '1010': '2'}
if key[1:-1] in key2num: # coupling
new.loc[:, 'ERROR'] = meas['FWSTD' + key2num[key[1:-1]]]
else:
new.loc[:, 'ERROR'] = meas['STD'+key]
# weights
new.loc[:, 'WEIGHT'] = weight
if erwg:
new.loc[:, 'WEIGHT'] = _get_errorbased_weights(key, new['WEIGHT'], new['ERROR'])
# filter cuts
error_filter = new.loc[:, 'ERROR'].values < errorcut
try:
model_filter = np.abs(new.loc[:, 'VALUE'].values -
meas[key + 'MDL'].values) < modelcut
except KeyError:
# Why is there no standard for where "MDL" is attached to the name???
model_filter = np.abs(new['VALUE'].values -
meas['MDL' + key].values) < modelcut
good_bpms = error_filter & model_filter
LOG.debug("Number of BPMs with {:s}: {:d}".format(key, np.sum(good_bpms)))
return new.loc[good_bpms, :]
def _get_filtered_phases(key, meas, model, erwg, weight, modelcut, errorcut):
common_bpms = meas.index.intersection(model.index)
meas = meas.loc[common_bpms, :]
col_val = "PHASE" + key[-1]
col_err = "STDPH" + key[-1]
col_mdl = "PH" + key[-1] + "MDL"
# value
new = tfs.TfsDataFrame(index=common_bpms)
new.loc[:, "VALUE"] = meas[col_val]
# errors
new.loc[:, 'ERROR'] = meas[col_err]
# weights
new.loc[:, 'WEIGHT'] = weight
if erwg:
new.loc[:, 'WEIGHT'] = _get_errorbased_weights(key, new['WEIGHT'], new['ERROR'])
# filter cuts
error_filter = new['ERROR'] < errorcut
model_filter = np.abs(new['VALUE'] - meas[col_mdl]) < modelcut
new.loc[:, 'NAME2'] = meas['NAME2']
second_bpm_in = np.in1d(new['NAME2'].values, new.index.values)
good_bpms = error_filter & model_filter & second_bpm_in
good_bpms[-1] = False
LOG.debug("Number of BPMs with {:s}: {:d}".format(key, np.sum(good_bpms)))
return new.loc[good_bpms, :]
def _get_filtered_betabeat(key, meas, model, erwg, weight, modelcut, errorcut):
# Beta-beating and its error RELATIVE as shown in GUI
common_bpms = meas.index.intersection(model.index)
meas = meas.loc[common_bpms, :]
col_val = "BET" + key[-1]
col_std = "STDBET" + key[-1]
col_err = 'ERRBET' + key[-1]
col_mdl = "BET" + key[-1] + "MDL"
# value
new = tfs.TfsDataFrame(index=common_bpms)
new.loc[:, 'VALUE'] = meas[col_val]
# errors
if col_std in new.columns.values: # Old files or k-mod
new.loc[:, 'ERROR'] = np.sqrt(np.square(meas[col_err]) + np.square(meas[col_std]))
else:
new.loc[:, 'ERROR'] = meas[col_err]
# weights
new.loc[:, 'WEIGHT'] = weight
if erwg:
new.loc[:, 'WEIGHT'] = _get_errorbased_weights(key, new['WEIGHT'] * meas[col_mdl],
new['ERROR'])
# filter cuts
model_filter = np.abs(new['VALUE'] - meas[col_mdl]) / meas[col_mdl] < modelcut
error_filter = new['ERROR'] / meas[col_mdl] < errorcut
good_bpms = model_filter & error_filter
LOG.debug("Number of BPMs with {:s}: {:d}".format(key, np.sum(good_bpms)))
return new.loc[good_bpms, :]
def _get_tunes(key, meas, model, erwg, weight, modelcut, errorcut):
meas.loc[:, 'WEIGHT'] = weight
if erwg:
meas.loc[:, 'WEIGHT'] = _get_errorbased_weights(key, meas['WEIGHT'], meas['ERROR'])
LOG.debug("Number of tune measurements: " + str(len(meas.index.values)))
return meas
def _get_errorbased_weights(key, weights, errors):
if 0 in errors.values:
LOG.warn("Zero-values found in errors of '{:s}'. ".format(key) +
"Weights will not be based on errors for this parameter! " +
"(Maybe don't use --errorbars.)")
return weights
else:
return weights / errors
# Response filtering ##########################################################
def _filter_response_index(response, measurement, keys):
not_in_response = [k for k in keys if k not in response]
if len(not_in_response) > 0:
raise KeyError("The following optical parameters are not present in current"
"response matrix: {:s}".format(not_in_response))
filters = _get_response_filters()
new_resp = {}
for key in keys:
new_resp[key] = filters[key](response[key], measurement[key])
return new_resp
def _get_generic_response(resp, meas):
return resp.loc[meas.index.values, :]
def _get_phase_response(resp, meas):
phase1 = resp.loc[meas.index.values, :]
phase2 = resp.loc[meas.loc[:, 'NAME2'].values, :]
return -phase1.sub(phase2.values) # phs2-phs1 but with idx of phs1
def _get_tune_response(resp, meas):
return resp
# Model appending #############################################################
def _append_model_to_measurement(model, measurement, keys):
appenders = _get_model_appenders()
meas = {}
for key in keys:
meas[key] = appenders[key](model, measurement[key], key)
return meas
def _get_model_generic(model, meas, key):
with log_pandas_settings_with_copy(LOG.debug):
meas.loc[:, 'MODEL'] = model.loc[meas.index.values, key].values
meas.loc[:, 'DIFF'] = meas['VALUE'] - meas['MODEL']
return meas
def _get_model_phases(model, meas, key):
with log_pandas_settings_with_copy(LOG.debug):
meas.loc[:, 'MODEL'] = (model.loc[meas['NAME2'].values, key].values -
model.loc[meas.index.values, key].values)
meas.loc[:, 'DIFF'] = meas['VALUE'] - meas['MODEL']
return meas
def _get_model_betabeat(model, meas, key):
col = "BETX" if key == "BBX" else "BETY"
with log_pandas_settings_with_copy(LOG.debug):
meas.loc[:, 'MODEL'] = model.loc[meas.index.values, col].values
meas.loc[:, 'DIFF'] = (meas['VALUE'] - meas['MODEL']) / meas['MODEL']
return meas
def _get_model_norm_disp(model, meas, key):
col = key[1:]
beta = "BET" + key[-1]
with log_pandas_settings_with_copy(LOG.debug):
meas.loc[:, 'MODEL'] = (
model.loc[meas.index.values, col].values /
np.sqrt(model.loc[meas.index.values, beta].values)
)
meas.loc[:, 'DIFF'] = meas['VALUE'] - meas['MODEL']
return meas
def _get_model_tunes(model, meas, key):
# We want just fractional tunes
with log_pandas_settings_with_copy(LOG.debug):
meas.loc[:, 'MODEL'] = np.remainder([model['Q1'], model['Q2']], [1, 1])
meas.loc[:, 'DIFF'] = meas['VALUE'] - meas['MODEL']
return meas
# Main Calculation #############################################################
def _calculate_delta(resp_matrix, meas_dict, keys, vars_list, method, meth_opt):
""" Get the deltas for the variables.
Output is Dataframe with one column 'DELTA' and vars_list index. """
weight_vector = _join_columns('WEIGHT', meas_dict, keys)
diff_vector = _join_columns('DIFF', meas_dict, keys)
resp_weighted = resp_matrix.mul(weight_vector, axis="index")
diff_weighted = diff_vector * weight_vector
delta = _get_method_fun(method)(resp_weighted, diff_weighted, meth_opt)
delta = tfs.TfsDataFrame(delta, index=vars_list, columns=["DELTA"])
# check calculations
update = np.dot(resp_weighted, delta["DELTA"])
_print_rms(meas_dict, diff_weighted, update)
return delta
def _get_method_fun(method):
funcs = {
"pinv": _pseudo_inverse,
"omp": _orthogonal_matching_pursuit,
}
return funcs[method]
def _pseudo_inverse(response_mat, diff_vec, opt):
""" Calculates the pseudo-inverse of the response via svd. (numpy) """
if opt.svd_cut is None:
raise ValueError("svd_cut setting needed for pseudo inverse method.")
return np.dot(np.linalg.pinv(response_mat, opt.svd_cut), diff_vec)
def _orthogonal_matching_pursuit(response_mat, diff_vec, opt):
""" Calculated n_correctors via orthogonal matching pursuit"""
if opt.n_correctors is None:
raise ValueError("n_correctors setting needed for orthogonal matching pursuit.")
# return orthogonal_mp(response_mat, diff_vec, opt.n_correctors)
res = OrthogonalMatchingPursuit(opt.n_correctors).fit(response_mat, diff_vec)
coef = res.coef_
LOG.debug("Orthogonal Matching Pursuit Results:")
LOG.debug(" Chosen variables: {:s}".format(str(response_mat.columns.values[coef.nonzero()])))
LOG.debug(" Score: {:f}".format(res.score(response_mat, diff_vec)))
return coef
# MADX related #################################################################
def _create_corrected_model(twiss_out, change_params, accel_inst, debug):
""" Use the calculated deltas in changeparameters.madx to create a corrected model """
# create script from template
madx_script = accel_inst.get_update_correction_job(twiss_out, change_params)
# run madx
if debug:
madx_wrapper.resolve_and_run_string(madx_script)
else:
madx_wrapper.resolve_and_run_string(
madx_script,
log_file=os.devnull,
)
def write_knob(knob_path, delta):
a = datetime.datetime.fromtimestamp(time.time())
delta_out = - delta.loc[:, ["DELTA"]]
delta_out.headers["PATH"] = os.path.dirname(knob_path)
delta_out.headers["DATE"] = str(a.ctime())
tfs.write_tfs(knob_path, delta_out, save_index="NAME")
def writeparams(path_to_file, delta):
with open(path_to_file, "w") as madx_script:
for var in delta.index.values:
value = delta.loc[var, "DELTA"]
madx_script.write("{var:s} = {var:s} {value:+e};\n".format(var=var, value=value))
# Small Helpers ################################################################
def _rms(a):
return np.sqrt(np.mean(np.square(a)))
def _dump(path_to_dump, content):
with open(path_to_dump, 'wb') as dump_file:
cPickle.Pickler(dump_file, -1).dump(content)
def _join_responses(resp, keys, varslist):
""" Returns matrix #BPMs * #Parameters x #variables """
return pd.concat([resp[k] for k in keys], # dataframes
axis="index", # axis to join along
join_axes=[pd.Index(varslist)]
# other axes to use (pd Index obj required)
).fillna(0.0)
def _join_columns(col, meas, keys):
""" Retuns vector: N= #BPMs * #Parameters (BBX, MUX etc.) """
return np.concatenate([meas[key].loc[:, col].values for key in keys], axis=0)
def _filter_by_strength(delta, resp_matrix, min_strength=0):
""" Remove too small correctors """
delta = delta.loc[delta["DELTA"].abs() > min_strength]
return delta, resp_matrix.loc[:, delta.index], delta.index.values
# Main invocation ############################################################
if __name__ == "__main__":
global_correction()