r"""
Provides Class to get response matrices from Twiss parameters.
The calculation is based on formulas in [#FranchiAnalyticformulasrapid2017]_, [#TomasReviewlinearoptics2017]_.
Only works properly for on-orbit twiss files.
* Beta Response: Eq. A35 inserted into Eq. B45 in [#FranchiAnalyticformulasrapid2017]_
.. math::
\delta \beta_{z,j} = \mp \beta_{z,j} \sum_m \delta K_{1,m} \frac{\beta_{z,m}}{2}
\frac{cos(2\tau_{z,mj})}{sin(2\pi Q_z)}
* Dispersion Response: Eq. 25-27 in [#FranchiAnalyticformulasrapid2017]_ + K1 (see Eq. B17)
.. math::
\delta D_{x,j} =&+ \sqrt{\beta_{x,j}} \sum_m (\delta K_{0,m} + \delta K_{1S,m} D_{y,m}
- \delta K_{1,m} D_{x,m}) \frac{\sqrt{\beta_{x,m}}}{2}
\frac{cos(\tau_{x,mj})}{sin(\pi Q_x)}
\\
\delta D_{y,j} =&- \sqrt{\beta_{y,j}} \sum_m (\delta K_{0S,m}
- \delta K_{1S,m} D_{x,m} - \delta K_{1,m} D_{y,m}) \frac{\sqrt{\beta_{y,m}}}{2}
\frac{cos(\tau_{y,mj})}{sin(\pi Q_y)}
* Norm. Dispersion Response: similar as above but with :math:`\frac{1}{\sqrt{\beta}}` linearized
.. math::
\delta \frac{D_{x,j}}{\sqrt{\beta_{x,j}}} =&+ \sum_m (\delta K_{0,m} + \delta K_{1S,m} D_{y,m}
- \delta K_{1,m} D_{x,m} ) \frac{\sqrt{\beta_{x,m}}}{2}
\frac{cos(\tau_{x,mj})}{sin(\pi Q_x)}
&&+ \frac{D_{x,j}}{\sqrt{\beta_{x,j}}} \delta K_{1,m}
\frac{\beta_{x,m}}{4}\frac{cos(2\tau_{x,mj})}{2sin(\pi Q_x)}
\\
\delta \frac{D_{y,j}}{\sqrt{\beta_{y,j}}} =&- \sum_m (\delta K_{0S,m} - \delta K_{1S,m} D_{x,m}
- \delta K_{1,m} D_{y,m}) \frac{\sqrt{\beta_{y,m}}}{2}
\frac{cos(\tau_{y,mj})}{sin(\pi Q_y)}
&&- \frac{D_{y,j}}{\sqrt{\beta_{y,j}}} \delta K_{1,m}
\frac{\beta_{y,m}}{4}\frac{cos(2\tau_{y,mj})}{2sin(\pi Q_y)}
* Phase Advance Response: Eq. 28 in [#FranchiAnalyticformulasrapid2017]_
.. math::
\delta \Phi_{z,wj} = \pm \sum_m \delta K_{1,m} \frac{\beta_{z,m}}{4}
\left\{ 2\left[ \Pi_{mj} - \Pi_{mw} + \Pi_{jw} \right] +
\frac{sin(2\tau_{z,mj}) - sin(2\tau_{z,mw})}{sin(2\pi Q_z)} \right\}
* Tune Response: Eq. 7 in [#TomasReviewlinearoptics2017]_
.. math::
\delta Q_z = \pm \sum_m \delta K_{1,m} \frac{\beta_{z,m}}{4\pi}
* Coupling Response: Eq. 10 in [#FranchiAnalyticformulasrapid2017]_
.. math::
\delta f_{\substack{\scriptscriptstyle 1001 \\ \scriptscriptstyle 1010},j} =
\sum_m \delta J_{1,m} \, \frac{\sqrt{\beta_{x,m}\beta_{y,m}}}{4} \,
\frac{\exp{(i(\Delta\Phi_{x,mj} \mp \Delta\Phi_{y,mj}))}}{1-\exp({2\pi i (Q_x \mp Q_y}))}
For people reading the code, the response matrices are first calculated like:
::
| Elements of interest (j) --> ... |
|Magnets (m) |
| | |
| v |
| . |
| . |
| . |
| |
This avoids transposing all vectors individually in the beginning.
At the end (of the calculation) the matrix is then transposed
to fit the :math:`M \cdot \delta K` orientation.
Also :math:`\Delta \Phi_{z,wj}` needs to be multiplied by :math:`2\pi` to be consistent.
.. rubric:: References
.. [#FranchiAnalyticformulasrapid2017]
A. Franchi et al.,
Analytic formulas for the rapid evaluation of the orbit response matrix
and chromatic functions from lattice parameters in circular accelerators
https://arxiv.org/abs/1711.06589
.. [#TomasReviewlinearoptics2017]
R. Tomas, et al.,
'Review of linear optics measurement and correction for charged particle
accelerators.'
Physical Review Accelerators and Beams, 20(5), 54801. (2017)
https://doi.org/10.1103/PhysRevAccelBeams.20.054801
"""
import cPickle as pickle
import numpy as np
import pandas as pd
from correction.fullresponse.sequence_evaluation import check_varmap_file
from twiss_optics.twiss_functions import get_phase_advances, tau, dphi
from twiss_optics.twiss_functions import upper
from utils import logging_tools as logtool
from tfs_files import tfs_pandas as tfs
from utils.contexts import timeit
LOG = logtool.get_logger(__name__)
DUMMY_ID = "DUMMY_PLACEHOLDER"
# Twiss Response Class ########################################################
[docs]class TwissResponse(object):
""" Provides Response Matrices calculated from sequence, model and given variables.
Args:
accel_inst (accelerator): Accelerator Instance (needs to contain elements model).
variable_categories (list): List of variable categories to get from the accelerator class.
varmap_or_path (dict, string): mapping of the variables,
either as dict-structure of Series or path to a pickled-file.
at_elements (str): Get response matrix for these elements. Can be:
'bpms': All BPMS (Default)
'bpms+': BPMS+ used magnets (== magnets defined by variables in varfile)
'all': All BPMS and Magnets given in the model (Markers are removed)
"""
################################
# INIT
################################
def __init__(self, accel_inst, variable_categories, varmap_or_path, at_elements='bpms'):
LOG.debug("Initializing TwissResponse.")
with timeit(lambda t: LOG.debug(" Time initializing TwissResponse: {:f}s".format(t))):
# Get input
self._twiss = self._get_model_twiss(accel_inst)
self._variables = accel_inst.get_variables(classes=variable_categories)
self._var_to_el = self._get_variable_mapping(varmap_or_path)
self._elements_in = self._get_input_elements()
self._elements_out = self._get_output_elements(at_elements)
self._direction = self._get_direction(accel_inst)
# calculate all phase advances
self._phase_advances = get_phase_advances(self._twiss)
# All responses are calcluated as needed, see getters below!
# slots for response matrices
self._beta = None
self._dispersion = None
self._phase = None
self._phase_adv = None
self._tune = None
self._coupling = None
self._beta_beat = None
self._norm_dispersion = None
# slots for mapped response matrices
self._coupling_mapped = None
self._beta_mapped = None
self._dispersion_mapped = None
self._phase_mapped = None
self._phase_adv_mapped = None
self._tune_mapped = None
self._beta_beat_mapped = None
self._norm_dispersion_mapped = None
@staticmethod
def _get_model_twiss(accel_inst):
""" Load model, but keep only BPMs and Magnets """
# get model
model = accel_inst.get_elements_tfs()
# Remove not needed entries
LOG.debug("Removing non-necessary entries:")
LOG.debug(" Entries total: {:d}".format(model.shape[0]))
mask = accel_inst.get_element_types_mask(model.index, types=["bpm", "magnet"])
model = model.loc[mask, :].copy() # make a copy to suppress "SettingWithCopyWarning"
LOG.debug(" Entries left: {:d}".format(model.shape[0]))
# Add Dummy for Phase Calculations
model.loc[DUMMY_ID, ["S", "MUX", "MUY"]] = 0.0
return model
def _get_variable_mapping(self, varmap_or_path):
""" Get variable mapping as dictionary
Dev hint: Define _variables first!
"""
LOG.debug("Converting variables to magnet names.")
variables = self._variables
if not len(variables):
raise ValueError("No variables found. Maybe wrong categories?")
try:
with open(varmap_or_path, "rb") as varmapfile:
mapping = pickle.load(varmapfile)
except TypeError:
LOG.debug("Received varmap as dictionary.")
mapping = varmap_or_path
else:
LOG.debug("Loaded varmap from file '{:s}'".format(varmap_or_path))
for order in ("K0L", "K0SL", "K1L", "K1SL"):
if order not in mapping:
mapping[order] = {}
# check if all variables can be found
check_var = [var for var in variables
if all(var not in mapping[order] for order in mapping)]
if len(check_var) > 0:
raise ValueError("Variables '{:s}' cannot be found in sequence!".format(
", ".join(check_var)
))
# drop mapping for unused variables
[mapping[order].pop(var) for order in mapping for var in mapping[order].keys()
if var not in variables]
return mapping
def _get_input_elements(self):
""" Return variable names of input elements.
Dev hint: Define _var_to_el and _twiss first!
"""
v2e = self._var_to_el
tw = self._twiss
el_in = dict.fromkeys(v2e.keys())
for order in el_in:
el_order = []
for var in v2e[order]:
el_order += upper(v2e[order][var].index)
el_in[order] = tw.loc[list(set(el_order)), "S"].sort_values().index.tolist()
return el_in
@staticmethod
def _get_direction(accel_inst):
""" Sign for the direction of the beam. """
return 1 if accel_inst.get_beam() == 1 else -1
def _get_output_elements(self, at_elements):
""" Return name-array of elements to use for output.
Dev hint: Define _elements_in first!
"""
tw_idx = self._twiss.index
if isinstance(at_elements, list):
# elements specified
if any(el not in tw_idx for el in at_elements):
LOG.warning("One or more specified elements are not in the model.")
return [idx for idx in tw_idx
if idx in at_elements]
if at_elements == "bpms":
# bpms only
return [idx for idx in tw_idx
if idx.upper().startswith('B')]
if at_elements == "bpms+":
# bpms and the used magnets
el_in = self._elements_in
return [idx for idx in tw_idx
if (idx.upper().startswith('B')
or any(idx in el_in[order] for order in el_in))]
if at_elements == "all":
# all, obviously
return [idx for idx in tw_idx if idx != DUMMY_ID]
################################
# Response Matrix
################################
def _calc_coupling_response(self):
""" Response Matrix for coupling.
Eq. 10 in [#FranchiAnalyticformulasrapid2017]_
"""
LOG.debug("Calculate Coupling Matrix")
with timeit(lambda t: LOG.debug(" Time needed: {:f}s".format(t))):
tw = self._twiss
adv = self._phase_advances
el_out = self._elements_out
k1s_el = self._elements_in["K1SL"]
dcoupl = dict.fromkeys(["1001", "1010"])
i2pi = 2j * np.pi
phx = dphi(adv['X'].loc[k1s_el, el_out], tw.Q1).values
phy = dphi(adv['Y'].loc[k1s_el, el_out], tw.Q2).values
bet_term = np.sqrt(tw.loc[k1s_el, "BETX"].values * tw.loc[k1s_el, "BETY"].values)
for plane in ["1001", "1010"]:
phs_sign = -1 if plane == "1001" else 1
dcoupl[plane] = tfs.TfsDataFrame(
bet_term[:, None] * np.exp(i2pi * (phx + phs_sign * phy)) /
(4 * (1 - np.exp(i2pi * (tw.Q1 + phs_sign * tw.Q2)))),
index=k1s_el, columns=el_out).transpose()
return dict_mul(self._direction, dcoupl)
def _calc_beta_response(self):
""" Response Matrix for delta beta.
Eq. A35 -> Eq. B45 in [#FranchiAnalyticformulasrapid2017]_
"""
LOG.debug("Calculate Beta Response Matrix")
with timeit(lambda t: LOG.debug(" Time needed: {:f}s".format(t))):
tw = self._twiss
adv = self._phase_advances
el_out = self._elements_out
k1_el = self._elements_in["K1L"]
dbeta = dict.fromkeys(["X", "Y"])
for plane in ["X", "Y"]:
col_beta = "BET" + plane
q = tw.Q1 if plane == "X" else tw.Q2
coeff_sign = -1 if plane == "X" else 1
pi2tau = 2 * np.pi * tau(adv[plane].loc[k1_el, el_out], q)
dbeta[plane] = tfs.TfsDataFrame(
tw.loc[el_out, col_beta].values[None, :] *
tw.loc[k1_el, col_beta].values[:, None] * np.cos(2 * pi2tau.values) *
(coeff_sign / (2 * np.sin(2 * np.pi * q))),
index=k1_el, columns=el_out).transpose()
return dict_mul(self._direction, dbeta)
def _calc_dispersion_response(self):
""" Response Matrix for delta normalized dispersion
Eq. 25-27 in [#FranchiAnalyticformulasrapid2017]_
But w/o the assumtion :math:`\delta K_1 = 0` from Appendix B.1
"""
LOG.debug("Calculate Dispersion Response Matrix")
with timeit(lambda t: LOG.debug(" Time needed: {:f}".format(t))):
tw = self._twiss
adv = self._phase_advances
el_out = self._elements_out
els_in = self._elements_in
sign_map = {
"X": {"K0L": 1, "K1L": -1, "K1SL": 1, },
"Y": {"K0SL": -1, "K1L": 1, "K1SL": 1, },
}
col_disp_map = {
"X": {"K1L": "DX", "K1SL": "DY", },
"Y": {"K1L": "DY", "K1SL": "DX", },
}
q_map = {"X": tw.Q1, "Y": tw.Q2}
disp_resp = dict.fromkeys(["{p:s}_{t:s}".format(p=p, t=t)
for p in sign_map for t in sign_map[p]])
for plane in sign_map:
q = q_map[plane]
col_beta = "BET{}".format(plane)
el_types = sign_map[plane].keys()
els_per_type = [els_in[el_type] for el_type in el_types]
coeff = np.sqrt(tw.loc[el_out, col_beta].values) / (2 * np.sin(np.pi * q))
for el_in, el_type in zip(els_per_type, el_types):
coeff_sign = sign_map[plane][el_type]
out_str = "{p:s}_{t:s}".format(p=plane, t=el_type)
if len(el_in):
pi2tau = 2 * np.pi * tau(adv[plane].loc[el_in, el_out], q)
bet_term = np.sqrt(tw.loc[el_in, col_beta])
try:
col_disp = col_disp_map[plane][el_type]
except KeyError:
pass
else:
bet_term *= tw.loc[el_in, col_disp]
disp_resp[out_str] = (coeff_sign * coeff[None, :] * bet_term[:, None] *
np.cos(pi2tau)
).transpose()
else:
LOG.debug(
" No '{:s}' variables found. ".format(el_type) +
"Dispersion Response '{:s}' will be empty.".format(out_str))
disp_resp[out_str] = tfs.TfsDataFrame(None, index=el_out)
return dict_mul(self._direction, disp_resp)
def _calc_norm_dispersion_response(self):
""" Response Matrix for delta normalized dispersion
Eq. 25-27 in [#FranchiAnalyticformulasrapid2017]_
But w/o the assumtion :math:`\delta K_1 = 0` from Appendix B.1
and added linearization for :math:`\frac{1}{\sqrt{\beta}}`
"""
LOG.debug("Calculate Normalized Dispersion Response Matrix")
with timeit(lambda t: LOG.debug(" Time needed: {:f}".format(t))):
tw = self._twiss
adv = self._phase_advances
el_out = self._elements_out
els_in = self._elements_in
sign_map = {
"X": {"K0L": 1, "K1L": -1, "K1SL": 1, },
"Y": {"K0SL": -1, "K1L": 1, "K1SL": 1, },
}
col_disp_map = {
"X": {"K1L": "DX", "K1SL": "DY", },
"Y": {"K1L": "DY", "K1SL": "DX", },
}
sign_correct_term = {
"X": {"K1L": 1},
"Y": {"K1L": -1},
}
q_map = {"X": tw.Q1, "Y": tw.Q2}
disp_resp = dict.fromkeys(["{p:s}_{t:s}".format(p=p, t=t)
for p in sign_map for t in sign_map[p]])
for plane in sign_map:
q = q_map[plane]
col_beta = "BET{}".format(plane)
el_types = sign_map[plane].keys()
els_per_type = [els_in[el_type] for el_type in el_types]
coeff = 1 / (2 * np.sin(np.pi * q))
coeff_corr = 1 / (4 * np.sin(2 * np.pi * q))
for el_in, el_type in zip(els_per_type, el_types):
coeff_sign = sign_map[plane][el_type]
out_str = "{p:s}_{t:s}".format(p=plane, t=el_type)
if len(el_in):
pi2tau = 2 * np.pi * tau(adv[plane].loc[el_in, el_out], q)
beta_in = tw.loc[el_in, col_beta]
bet_term = np.sqrt(beta_in)
try:
col_disp = col_disp_map[plane][el_type]
except KeyError:
pass
else:
bet_term *= tw.loc[el_in, col_disp]
result = (coeff_sign * coeff * bet_term)[:, None] * np.cos(pi2tau)
# correction term
try:
sign_corr = sign_correct_term[plane][el_type]
except KeyError:
pass
else:
norm_disp_corr = (tw.loc[el_out, col_disp] /
np.sqrt(tw.loc[el_out, col_beta]))
result += (sign_corr * coeff_corr * norm_disp_corr[None, :] *
beta_in[:, None] * np.cos(2 * pi2tau))
disp_resp[out_str] = result.transpose()
else:
LOG.debug(
" No '{:s}' variables found. ".format(el_type) +
"Normalized Dispersion Response '{:s}' will be empty.".format(out_str))
disp_resp[out_str] = tfs.TfsDataFrame(None, index=el_out)
return dict_mul(self._direction, disp_resp)
def _calc_phase_advance_response(self):
""" Response Matrix for delta DPhi.
Eq. 28 in [#FranchiAnalyticformulasrapid2017]_
Reduced to only phase advances between consecutive elements,
as the 3D-Matrix of all elements exceeds memory space
(~11000^3 = 1331 Giga Elements)
--> w = j-1: DPhi(z,j) = DPhi(x, (j-1)->j)
"""
LOG.debug("Calculate Phase Advance Response Matrix")
with timeit(lambda t: LOG.debug(" Time needed: {:f}s".format(t))):
tw = self._twiss
adv = self._phase_advances
k1_el = self._elements_in["K1L"]
el_out_all = [DUMMY_ID] + self._elements_out # Add MU[XY] = 0.0 to the start
el_out = el_out_all[1:] # in these we are actually interested
el_out_mm = el_out_all[0:-1] # elements--
if len(k1_el) > 0:
dmu = dict.fromkeys(["X", "Y"])
pi = tfs.TfsDataFrame(tw['S'][:, None] < tw['S'][None, :], # pi(i,j) = s(i) < s(j)
index=tw.index, columns=tw.index, dtype=int)
pi_term = (pi.loc[k1_el, el_out].values -
pi.loc[k1_el, el_out_mm].values +
np.diag(pi.loc[el_out, el_out_mm].values)[None, :])
for plane in ["X", "Y"]:
col_beta = "BET" + plane
q = tw.Q1 if plane == "X" else tw.Q2
coeff_sign = 1 if plane == "X" else -1
pi2tau = 2 * np.pi * tau(adv[plane].loc[k1_el, el_out_all], q)
brackets = (2 * pi_term +
((np.sin(2 * pi2tau.loc[:, el_out].values) -
np.sin(2 * pi2tau.loc[:, el_out_mm].values))
/ np.sin(2 * np.pi * q)
))
dmu[plane] = tfs.TfsDataFrame(
tw.loc[k1_el, col_beta].values[:, None] * brackets
* (coeff_sign / (8 * np.pi)),
index=k1_el, columns=el_out).transpose()
else:
LOG.debug(" No 'K1L' variables found. Phase Response will be empty.")
dmu = {"X": tfs.TfsDataFrame(None, index=el_out),
"Y": tfs.TfsDataFrame(None, index=el_out)}
return dict_mul(self._direction, dmu)
def _calc_phase_response(self):
""" Response Matrix for delta DPhi.
Eq. 28 in [#FranchiAnalyticformulasrapid2017]_
Reduced to only delta phase.
--> w = 0: DPhi(z,j) = DPhi(x, 0->j)
This calculation could also be achieved by applying np.cumsum to the DataFrames of
_calc_phase_adv_response() (tested!), but _calc_phase_response() is about 4x faster.
"""
LOG.debug("Calculate Phase Response Matrix")
with timeit(lambda t: LOG.debug(" Time needed: {:f}s".format(t))):
tw = self._twiss
adv = self._phase_advances
k1_el = self._elements_in["K1L"]
el_out = self._elements_out
if len(k1_el) > 0:
dmu = dict.fromkeys(["X", "Y"])
pi = tfs.TfsDataFrame(tw['S'][:, None] < tw['S'][None, :], # pi(i,j) = s(i) < s(j)
index=tw.index, columns=tw.index, dtype=int)
pi_term = pi.loc[k1_el, el_out].values
for plane in ["X", "Y"]:
col_beta = "BET" + plane
q = tw.Q1 if plane == "X" else tw.Q2
coeff_sign = 1 if plane == "X" else -1
pi2tau = 2 * np.pi * tau(adv[plane].loc[k1_el, [DUMMY_ID] + el_out], q)
brackets = (2 * pi_term +
((np.sin(2 * pi2tau.loc[:, el_out].values) -
np.sin(2 * pi2tau.loc[:, DUMMY_ID].values[:, None]))
/ np.sin(2 * np.pi * q)
))
dmu[plane] = tfs.TfsDataFrame(
tw.loc[k1_el, col_beta].values[:, None] * brackets
* (coeff_sign / (8 * np.pi)),
index=k1_el, columns=el_out).transpose()
else:
LOG.debug(" No 'K1L' variables found. Phase Response will be empty.")
dmu = {"X": tfs.TfsDataFrame(None, index=el_out),
"Y": tfs.TfsDataFrame(None, index=el_out)}
return dict_mul(self._direction, dmu)
def _calc_tune_response(self):
""" Response vectors for Tune.
Eq. 7 in [#TomasReviewlinearoptics2017]_
"""
LOG.debug("Calculate Tune Response Matrix")
with timeit(lambda t: LOG.debug(" Time needed: {:f}s".format(t))):
tw = self._twiss
k1_el = self._elements_in["K1L"]
if len(k1_el) > 0:
dtune = dict.fromkeys(["X", "Y"])
dtune["X"] = 1/(4 * np.pi) * tw.loc[k1_el, ["BETX"]].transpose()
dtune["X"].index = ["DQX"]
dtune["Y"] = -1 / (4 * np.pi) * tw.loc[k1_el, ["BETY"]].transpose()
dtune["Y"].index = ["DQY"]
else:
LOG.debug(" No 'K1L' variables found. Tune Response will be empty.")
dtune = {"X": tfs.TfsDataFrame(None, index=["DQX"]),
"Y": tfs.TfsDataFrame(None, index=["DQY"])}
return dict_mul(self._direction, dtune)
################################
# Normalizing
################################
def _normalize_beta_response(self, beta):
""" Convert to Beta Beating """
el_out = self._elements_out
tw = self._twiss
beta_norm = dict.fromkeys(beta.keys())
for plane in beta:
col = "BET" + plane
beta_norm[plane] = beta[plane].div(
tw.loc[el_out, col], axis='index')
return beta_norm
################################
# Mapping
################################
def _map_dispersion_response(self, disp):
""" Maps all dispersion matrices """
disp_mapped = dict.fromkeys(disp.keys())
m2v = self._map_to_variables
for plane in disp:
mapping = self._var_to_el[plane.split("_")[1]]
disp_mapped[plane] = m2v(disp[plane], mapping)
return disp_mapped
@staticmethod
def _map_to_variables(df, mapping):
""" Maps from magnets to variables using self._var_to_el.
Could actually be done by matrix multiplication :math:'A \cdot var_to_el',
yet, as var_to_el is very sparsely populated, looping is easier.
Args:
df: DataFrame or dictionary of DataFrames to map
mapping: mapping to be applied (e.g. var_to_el[order])
Returns:
DataFrame or dictionary of mapped DataFrames
"""
def map_fun(df, mapping):
""" Actual mapping function """
df_map = tfs.TfsDataFrame(index=df.index, columns=mapping.keys())
for var, magnets in mapping.iteritems():
df_map[var] = df.loc[:, upper(magnets.index)].mul(
magnets.values, axis="columns"
).sum(axis="columns")
return df_map
# convenience wrapper for dicts
if isinstance(df, dict):
mapped = dict.fromkeys(df.keys())
for plane in mapped:
mapped[plane] = map_fun(df[plane], mapping)
else:
mapped = map_fun(df, mapping)
return mapped
################################
# Getters
################################
[docs] def get_beta(self, mapped=True):
""" Returns Response Matrix for Beta """
if not self._beta:
self._beta = self._calc_beta_response()
if mapped and not self._beta_mapped:
self._beta_mapped = self._map_to_variables(self._beta,
self._var_to_el["K1L"])
if mapped:
return self._beta_mapped
else:
return self._beta
[docs] def get_beta_beat(self, mapped=True):
""" Returns Response Matrix for Beta Beating """
if not self._beta:
self._beta = self._calc_beta_response()
if not self._beta_beat:
self._beta_beat = self._normalize_beta_response(self._beta)
if mapped and not self._beta_beat_mapped:
self._beta_beat_mapped = self._map_to_variables(self._beta_beat,
self._var_to_el["K1L"])
if mapped:
return self._beta_beat_mapped
else:
return self._beta_beat
[docs] def get_dispersion(self, mapped=True):
""" Returns Response Matrix for Dispersion """
if not self._dispersion:
self._dispersion = self._calc_dispersion_response()
if mapped and not self._dispersion_mapped:
self._dispersion_mapped = self._map_dispersion_response(self._dispersion)
if mapped:
return self._dispersion_mapped
else:
return self._dispersion
[docs] def get_norm_dispersion(self, mapped=True):
""" Returns Response Matrix for Normalized Dispersion """
if not self._norm_dispersion:
self._norm_dispersion = self._calc_norm_dispersion_response()
if mapped and not self._norm_dispersion_mapped:
self._norm_dispersion_mapped = self._map_dispersion_response(self._norm_dispersion)
if mapped:
return self._norm_dispersion_mapped
else:
return self._norm_dispersion
[docs] def get_phase(self, mapped=True):
""" Returns Response Matrix for Total Phase """
if not self._phase:
self._phase = self._calc_phase_response()
if mapped and not self._phase_mapped:
self._phase_mapped = self._map_to_variables(self._phase, self._var_to_el["K1L"])
if mapped:
return self._phase_mapped
else:
return self._phase
[docs] def get_phase_adv(self, mapped=True):
""" Returns Response Matrix for Phase Advance """
if not self._phase_adv:
self._phase_adv = self._calc_phase_advance_response()
if mapped and not self._phase_adv_mapped:
self._phase_adv_mapped = self._map_to_variables(self._phase_adv, self._var_to_el["K1L"])
if mapped:
return self._phase_adv_mapped
else:
return self._phase_adv
[docs] def get_tune(self, mapped=True):
""" Returns Response Matrix for the Tunes """
if not self._tune:
self._tune = self._calc_tune_response()
if mapped and not self._tune_mapped:
self._tune_mapped = self._map_to_variables(self._tune, self._var_to_el["K1L"])
if mapped:
return self._tune_mapped
else:
return self._tune
[docs] def get_coupling(self, mapped=True):
""" Returns Response Matrix for the coupling """
if not self._coupling:
self._coupling = self._calc_coupling_response()
if mapped and not self._coupling_mapped:
self._coupling_mapped = self._map_to_variables(self._coupling, self._var_to_el["K1SL"])
if mapped:
return self._coupling_mapped
else:
return self._coupling
def get_variable_names(self):
return self._variables
def get_variable_mapping(self, order=None):
if order is None:
return self._var_to_el
else:
return self._var_to_el[order]
[docs] def get_response_for(self, obs=None):
""" Calculates and returns only desired response matrices """
# calling functions for the getters to call functions only if needed
def caller(func, plane):
return func()[plane]
def disp_caller(func, plane):
disp = func()
return response_add(*[disp[k] for k in disp.keys() if k.startswith(plane)])
def tune_caller(func, _unused):
tune = func()
res = tune["X"].append(tune["Y"])
res.index = ["Q1", "Q2"]
return res
def couple_caller(func, plane):
# apply() converts empty DataFrames to Series! Cast them back.
# Also: take care of minus-sign convention!
sign = -1 if plane[-1] == "R" else 1
part_func = np.real if plane[-1] == "R" else np.imag
return sign * tfs.TfsDataFrame(func()[plane[:-1]].apply(part_func).astype(np.float64))
# to avoid if-elif-elif-...
obs_map = {
'Q': (tune_caller, self.get_tune, None),
'BETX': (caller, self.get_beta, "X"),
'BETY': (caller, self.get_beta, "Y"),
'BBX': (caller, self.get_beta_beat, "X"),
'BBY': (caller, self.get_beta_beat, "Y"),
'MUX': (caller, self.get_phase, "X"),
'MUY': (caller, self.get_phase, "Y"),
'DX': (disp_caller, self.get_dispersion, "X"),
'DY': (disp_caller, self.get_dispersion, "Y"),
'NDX': (disp_caller, self.get_norm_dispersion, "X"),
'NDY': (disp_caller, self.get_norm_dispersion, "Y"),
'F1001R': (couple_caller, self.get_coupling, "1001R"),
'F1001I': (couple_caller, self.get_coupling, "1001I"),
'F1010R': (couple_caller, self.get_coupling, "1010R"),
'F1010I': (couple_caller, self.get_coupling, "1010I"),
}
if obs is None:
obs = obs_map.keys()
LOG.debug("Calculating responses for {:s}.".format(obs))
with timeit(lambda t: LOG.debug("Total time getting responses: {:f}s".format(t))):
response = dict.fromkeys(obs)
for key in obs:
response[key] = obs_map[key][0](*obs_map[key][1:3])
return response
# Associated Functions #########################################################
[docs]def get_delta(response, delta_k):
""" Returns the deltas of :math:`response_matrix \cdot delta_k`.
Args:
response: Response dictionary
delta_k: Pandas Series of variables and their delta-value
Returns:
TFS_DataFrame with elements as indices and the calculated deltas in the columns
"""
delta_df = tfs.TfsDataFrame(None, index=response.index)
for col in response.keys():
# equivalent to .dot() but more efficient as delta_k is "sparse"
if col == "Q":
try:
delta_q = (response[col].loc[:, delta_k.index] * delta_k
).dropna(axis="columns").sum(axis="columns")
except KeyError:
# none of the delta_k are in DataFrame
delta_q = pd.Series([0., 0.], index=["Q1", "Q2"])
delta_df.headers["QX"] = delta_q["Q1"]
delta_df.headers["QY"] = delta_q["Q2"]
else:
try:
delta_df.loc[:, col] = (response[col].loc[:, delta_k.index] * delta_k
).dropna(axis="columns").sum(axis="columns")
except KeyError:
# none of the delta_k are in DataFrame
delta_df.loc[:, col] = 0.
return delta_df
[docs]def response_add(*args):
""" Merges two or more Response Matrix DataFrames """
base_df = args[0]
for df in args[1:]:
base_df = base_df.add(df, fill_value=0.)
return base_df
[docs]def dict_mul(number, dictionary):
""" Multiply an int with a dict of dataframes (or anything multiplyable) """
if number != 1:
for key in dictionary:
dictionary[key] = number * dictionary[key]
return dictionary
# Wrapper ##################################################################
[docs]def create_response(accel_inst, vars_categories, optics_params):
""" Wrapper to create response via TwissResponse """
LOG.debug("Creating response via TwissResponse.")
varmap_path = check_varmap_file(accel_inst, vars_categories)
with timeit(lambda t:
LOG.debug("Total time getting TwissResponse: {:f}s".format(t))):
tr = TwissResponse(accel_inst, vars_categories, varmap_path)
response = tr.get_response_for(optics_params)
if not any([resp.size for resp in response.values()]):
raise ValueError("Responses are all empty. " +
"Are variables {:s} ".format(tr.get_variable_names()) +
"correct for '{:s}'?".format(optics_params)
)
return response
# Script Mode ##################################################################
if __name__ == '__main__':
raise EnvironmentError("{:s} is not supposed to run as main.".format(__file__))