Source code for correction.fullresponse.response_twiss

Provides Class to get response matrices from Twiss parameters.

The calculation is based on formulas in [#FranchiAnalyticformulasrapid2017]_, [#TomasReviewlinearoptics2017]_.

Only works properly for on-orbit twiss files.

* Beta Response:     Eq. A35 inserted into Eq. B45 in [#FranchiAnalyticformulasrapid2017]_

.. math::

    \delta \beta_{z,j} = \mp \beta_{z,j} \sum_m \delta K_{1,m} \frac{\beta_{z,m}}{2}
    \frac{cos(2\tau_{z,mj})}{sin(2\pi Q_z)}

* Dispersion Response: Eq. 25-27 in [#FranchiAnalyticformulasrapid2017]_ + K1 (see Eq. B17)

.. math::

    \delta D_{x,j} =&+ \sqrt{\beta_{x,j}} \sum_m (\delta K_{0,m} + \delta K_{1S,m} D_{y,m}
    - \delta K_{1,m} D_{x,m}) \frac{\sqrt{\beta_{x,m}}}{2}
    \frac{cos(\tau_{x,mj})}{sin(\pi Q_x)}
    \delta D_{y,j} =&- \sqrt{\beta_{y,j}} \sum_m (\delta K_{0S,m}
    - \delta K_{1S,m} D_{x,m} - \delta K_{1,m} D_{y,m}) \frac{\sqrt{\beta_{y,m}}}{2}
    \frac{cos(\tau_{y,mj})}{sin(\pi Q_y)}

* Norm. Dispersion Response: similar as above but with :math:`\frac{1}{\sqrt{\beta}}` linearized

.. math::

    \delta \frac{D_{x,j}}{\sqrt{\beta_{x,j}}} =&+ \sum_m (\delta K_{0,m} + \delta K_{1S,m} D_{y,m}
    - \delta K_{1,m} D_{x,m} ) \frac{\sqrt{\beta_{x,m}}}{2}
    \frac{cos(\tau_{x,mj})}{sin(\pi Q_x)}
    &&+ \frac{D_{x,j}}{\sqrt{\beta_{x,j}}} \delta K_{1,m}
    \frac{\beta_{x,m}}{4}\frac{cos(2\tau_{x,mj})}{2sin(\pi Q_x)}
    \delta \frac{D_{y,j}}{\sqrt{\beta_{y,j}}} =&- \sum_m (\delta K_{0S,m} - \delta K_{1S,m} D_{x,m}
    - \delta K_{1,m} D_{y,m}) \frac{\sqrt{\beta_{y,m}}}{2}
    \frac{cos(\tau_{y,mj})}{sin(\pi Q_y)}
    &&- \frac{D_{y,j}}{\sqrt{\beta_{y,j}}} \delta K_{1,m}
    \frac{\beta_{y,m}}{4}\frac{cos(2\tau_{y,mj})}{2sin(\pi Q_y)}

* Phase Advance Response:    Eq. 28 in [#FranchiAnalyticformulasrapid2017]_

.. math::

    \delta \Phi_{z,wj} = \pm \sum_m \delta K_{1,m} \frac{\beta_{z,m}}{4}
    \left\{ 2\left[ \Pi_{mj} - \Pi_{mw} + \Pi_{jw} \right] +
    \frac{sin(2\tau_{z,mj}) - sin(2\tau_{z,mw})}{sin(2\pi Q_z)} \right\}

* Tune Response:             Eq. 7 in [#TomasReviewlinearoptics2017]_

.. math::

    \delta Q_z = \pm \sum_m \delta K_{1,m} \frac{\beta_{z,m}}{4\pi}

* Coupling Response:            Eq. 10 in [#FranchiAnalyticformulasrapid2017]_

.. math::

    \delta f_{\substack{\scriptscriptstyle 1001 \\ \scriptscriptstyle 1010},j} =
    \sum_m \delta J_{1,m} \, \frac{\sqrt{\beta_{x,m}\beta_{y,m}}}{4} \,
    \frac{\exp{(i(\Delta\Phi_{x,mj} \mp \Delta\Phi_{y,mj}))}}{1-\exp({2\pi i (Q_x \mp Q_y}))}

For people reading the code, the response matrices are first calculated like:


    |  Elements of interest (j) --> ... |
    |Magnets (m)                        |
    |  |                                |
    |  v                                |
    |  .                                |
    |  .                                |
    |  .                                |
    |                                   |

This avoids transposing all vectors individually in the beginning.
At the end (of the calculation) the matrix is then transposed
to fit the :math:`M \cdot \delta K` orientation.

Also :math:`\Delta \Phi_{z,wj}` needs to be multiplied by :math:`2\pi` to be consistent.

.. rubric:: References

..  [#FranchiAnalyticformulasrapid2017]
    A. Franchi et al.,
    Analytic formulas for the rapid evaluation of the orbit response matrix
    and chromatic functions from lattice parameters in circular accelerators

.. [#TomasReviewlinearoptics2017]
    R. Tomas, et al.,
    'Review of linear optics measurement and correction for charged particle
    Physical Review Accelerators and Beams, 20(5), 54801. (2017)

import cPickle as pickle

import numpy as np
import pandas as pd

from correction.fullresponse.sequence_evaluation import check_varmap_file
from twiss_optics.twiss_functions import get_phase_advances, tau, dphi
from twiss_optics.twiss_functions import upper
from utils import logging_tools as logtool
from tfs_files import tfs_pandas as tfs
from utils.contexts import timeit

LOG = logtool.get_logger(__name__)


# Twiss Response Class ########################################################

[docs]class TwissResponse(object): """ Provides Response Matrices calculated from sequence, model and given variables. Args: accel_inst (accelerator): Accelerator Instance (needs to contain elements model). variable_categories (list): List of variable categories to get from the accelerator class. varmap_or_path (dict, string): mapping of the variables, either as dict-structure of Series or path to a pickled-file. at_elements (str): Get response matrix for these elements. Can be: 'bpms': All BPMS (Default) 'bpms+': BPMS+ used magnets (== magnets defined by variables in varfile) 'all': All BPMS and Magnets given in the model (Markers are removed) """ ################################ # INIT ################################ def __init__(self, accel_inst, variable_categories, varmap_or_path, at_elements='bpms'): LOG.debug("Initializing TwissResponse.") with timeit(lambda t: LOG.debug(" Time initializing TwissResponse: {:f}s".format(t))): # Get input self._twiss = self._get_model_twiss(accel_inst) self._variables = accel_inst.get_variables(classes=variable_categories) self._var_to_el = self._get_variable_mapping(varmap_or_path) self._elements_in = self._get_input_elements() self._elements_out = self._get_output_elements(at_elements) self._direction = self._get_direction(accel_inst) # calculate all phase advances self._phase_advances = get_phase_advances(self._twiss) # All responses are calcluated as needed, see getters below! # slots for response matrices self._beta = None self._dispersion = None self._phase = None self._phase_adv = None self._tune = None self._coupling = None self._beta_beat = None self._norm_dispersion = None # slots for mapped response matrices self._coupling_mapped = None self._beta_mapped = None self._dispersion_mapped = None self._phase_mapped = None self._phase_adv_mapped = None self._tune_mapped = None self._beta_beat_mapped = None self._norm_dispersion_mapped = None @staticmethod def _get_model_twiss(accel_inst): """ Load model, but keep only BPMs and Magnets """ # get model model = accel_inst.get_elements_tfs() # Remove not needed entries LOG.debug("Removing non-necessary entries:") LOG.debug(" Entries total: {:d}".format(model.shape[0])) mask = accel_inst.get_element_types_mask(model.index, types=["bpm", "magnet"]) model = model.loc[mask, :].copy() # make a copy to suppress "SettingWithCopyWarning" LOG.debug(" Entries left: {:d}".format(model.shape[0])) # Add Dummy for Phase Calculations model.loc[DUMMY_ID, ["S", "MUX", "MUY"]] = 0.0 return model def _get_variable_mapping(self, varmap_or_path): """ Get variable mapping as dictionary Dev hint: Define _variables first! """ LOG.debug("Converting variables to magnet names.") variables = self._variables if not len(variables): raise ValueError("No variables found. Maybe wrong categories?") try: with open(varmap_or_path, "rb") as varmapfile: mapping = pickle.load(varmapfile) except TypeError: LOG.debug("Received varmap as dictionary.") mapping = varmap_or_path else: LOG.debug("Loaded varmap from file '{:s}'".format(varmap_or_path)) for order in ("K0L", "K0SL", "K1L", "K1SL"): if order not in mapping: mapping[order] = {} # check if all variables can be found check_var = [var for var in variables if all(var not in mapping[order] for order in mapping)] if len(check_var) > 0: raise ValueError("Variables '{:s}' cannot be found in sequence!".format( ", ".join(check_var) )) # drop mapping for unused variables [mapping[order].pop(var) for order in mapping for var in mapping[order].keys() if var not in variables] return mapping def _get_input_elements(self): """ Return variable names of input elements. Dev hint: Define _var_to_el and _twiss first! """ v2e = self._var_to_el tw = self._twiss el_in = dict.fromkeys(v2e.keys()) for order in el_in: el_order = [] for var in v2e[order]: el_order += upper(v2e[order][var].index) el_in[order] = tw.loc[list(set(el_order)), "S"].sort_values().index.tolist() return el_in @staticmethod def _get_direction(accel_inst): """ Sign for the direction of the beam. """ return 1 if accel_inst.get_beam() == 1 else -1 def _get_output_elements(self, at_elements): """ Return name-array of elements to use for output. Dev hint: Define _elements_in first! """ tw_idx = self._twiss.index if isinstance(at_elements, list): # elements specified if any(el not in tw_idx for el in at_elements): LOG.warning("One or more specified elements are not in the model.") return [idx for idx in tw_idx if idx in at_elements] if at_elements == "bpms": # bpms only return [idx for idx in tw_idx if idx.upper().startswith('B')] if at_elements == "bpms+": # bpms and the used magnets el_in = self._elements_in return [idx for idx in tw_idx if (idx.upper().startswith('B') or any(idx in el_in[order] for order in el_in))] if at_elements == "all": # all, obviously return [idx for idx in tw_idx if idx != DUMMY_ID] ################################ # Response Matrix ################################ def _calc_coupling_response(self): """ Response Matrix for coupling. Eq. 10 in [#FranchiAnalyticformulasrapid2017]_ """ LOG.debug("Calculate Coupling Matrix") with timeit(lambda t: LOG.debug(" Time needed: {:f}s".format(t))): tw = self._twiss adv = self._phase_advances el_out = self._elements_out k1s_el = self._elements_in["K1SL"] dcoupl = dict.fromkeys(["1001", "1010"]) i2pi = 2j * np.pi phx = dphi(adv['X'].loc[k1s_el, el_out], tw.Q1).values phy = dphi(adv['Y'].loc[k1s_el, el_out], tw.Q2).values bet_term = np.sqrt(tw.loc[k1s_el, "BETX"].values * tw.loc[k1s_el, "BETY"].values) for plane in ["1001", "1010"]: phs_sign = -1 if plane == "1001" else 1 dcoupl[plane] = tfs.TfsDataFrame( bet_term[:, None] * np.exp(i2pi * (phx + phs_sign * phy)) / (4 * (1 - np.exp(i2pi * (tw.Q1 + phs_sign * tw.Q2)))), index=k1s_el, columns=el_out).transpose() return dict_mul(self._direction, dcoupl) def _calc_beta_response(self): """ Response Matrix for delta beta. Eq. A35 -> Eq. B45 in [#FranchiAnalyticformulasrapid2017]_ """ LOG.debug("Calculate Beta Response Matrix") with timeit(lambda t: LOG.debug(" Time needed: {:f}s".format(t))): tw = self._twiss adv = self._phase_advances el_out = self._elements_out k1_el = self._elements_in["K1L"] dbeta = dict.fromkeys(["X", "Y"]) for plane in ["X", "Y"]: col_beta = "BET" + plane q = tw.Q1 if plane == "X" else tw.Q2 coeff_sign = -1 if plane == "X" else 1 pi2tau = 2 * np.pi * tau(adv[plane].loc[k1_el, el_out], q) dbeta[plane] = tfs.TfsDataFrame( tw.loc[el_out, col_beta].values[None, :] * tw.loc[k1_el, col_beta].values[:, None] * np.cos(2 * pi2tau.values) * (coeff_sign / (2 * np.sin(2 * np.pi * q))), index=k1_el, columns=el_out).transpose() return dict_mul(self._direction, dbeta) def _calc_dispersion_response(self): """ Response Matrix for delta normalized dispersion Eq. 25-27 in [#FranchiAnalyticformulasrapid2017]_ But w/o the assumtion :math:`\delta K_1 = 0` from Appendix B.1 """ LOG.debug("Calculate Dispersion Response Matrix") with timeit(lambda t: LOG.debug(" Time needed: {:f}".format(t))): tw = self._twiss adv = self._phase_advances el_out = self._elements_out els_in = self._elements_in sign_map = { "X": {"K0L": 1, "K1L": -1, "K1SL": 1, }, "Y": {"K0SL": -1, "K1L": 1, "K1SL": 1, }, } col_disp_map = { "X": {"K1L": "DX", "K1SL": "DY", }, "Y": {"K1L": "DY", "K1SL": "DX", }, } q_map = {"X": tw.Q1, "Y": tw.Q2} disp_resp = dict.fromkeys(["{p:s}_{t:s}".format(p=p, t=t) for p in sign_map for t in sign_map[p]]) for plane in sign_map: q = q_map[plane] col_beta = "BET{}".format(plane) el_types = sign_map[plane].keys() els_per_type = [els_in[el_type] for el_type in el_types] coeff = np.sqrt(tw.loc[el_out, col_beta].values) / (2 * np.sin(np.pi * q)) for el_in, el_type in zip(els_per_type, el_types): coeff_sign = sign_map[plane][el_type] out_str = "{p:s}_{t:s}".format(p=plane, t=el_type) if len(el_in): pi2tau = 2 * np.pi * tau(adv[plane].loc[el_in, el_out], q) bet_term = np.sqrt(tw.loc[el_in, col_beta]) try: col_disp = col_disp_map[plane][el_type] except KeyError: pass else: bet_term *= tw.loc[el_in, col_disp] disp_resp[out_str] = (coeff_sign * coeff[None, :] * bet_term[:, None] * np.cos(pi2tau) ).transpose() else: LOG.debug( " No '{:s}' variables found. ".format(el_type) + "Dispersion Response '{:s}' will be empty.".format(out_str)) disp_resp[out_str] = tfs.TfsDataFrame(None, index=el_out) return dict_mul(self._direction, disp_resp) def _calc_norm_dispersion_response(self): """ Response Matrix for delta normalized dispersion Eq. 25-27 in [#FranchiAnalyticformulasrapid2017]_ But w/o the assumtion :math:`\delta K_1 = 0` from Appendix B.1 and added linearization for :math:`\frac{1}{\sqrt{\beta}}` """ LOG.debug("Calculate Normalized Dispersion Response Matrix") with timeit(lambda t: LOG.debug(" Time needed: {:f}".format(t))): tw = self._twiss adv = self._phase_advances el_out = self._elements_out els_in = self._elements_in sign_map = { "X": {"K0L": 1, "K1L": -1, "K1SL": 1, }, "Y": {"K0SL": -1, "K1L": 1, "K1SL": 1, }, } col_disp_map = { "X": {"K1L": "DX", "K1SL": "DY", }, "Y": {"K1L": "DY", "K1SL": "DX", }, } sign_correct_term = { "X": {"K1L": 1}, "Y": {"K1L": -1}, } q_map = {"X": tw.Q1, "Y": tw.Q2} disp_resp = dict.fromkeys(["{p:s}_{t:s}".format(p=p, t=t) for p in sign_map for t in sign_map[p]]) for plane in sign_map: q = q_map[plane] col_beta = "BET{}".format(plane) el_types = sign_map[plane].keys() els_per_type = [els_in[el_type] for el_type in el_types] coeff = 1 / (2 * np.sin(np.pi * q)) coeff_corr = 1 / (4 * np.sin(2 * np.pi * q)) for el_in, el_type in zip(els_per_type, el_types): coeff_sign = sign_map[plane][el_type] out_str = "{p:s}_{t:s}".format(p=plane, t=el_type) if len(el_in): pi2tau = 2 * np.pi * tau(adv[plane].loc[el_in, el_out], q) beta_in = tw.loc[el_in, col_beta] bet_term = np.sqrt(beta_in) try: col_disp = col_disp_map[plane][el_type] except KeyError: pass else: bet_term *= tw.loc[el_in, col_disp] result = (coeff_sign * coeff * bet_term)[:, None] * np.cos(pi2tau) # correction term try: sign_corr = sign_correct_term[plane][el_type] except KeyError: pass else: norm_disp_corr = (tw.loc[el_out, col_disp] / np.sqrt(tw.loc[el_out, col_beta])) result += (sign_corr * coeff_corr * norm_disp_corr[None, :] * beta_in[:, None] * np.cos(2 * pi2tau)) disp_resp[out_str] = result.transpose() else: LOG.debug( " No '{:s}' variables found. ".format(el_type) + "Normalized Dispersion Response '{:s}' will be empty.".format(out_str)) disp_resp[out_str] = tfs.TfsDataFrame(None, index=el_out) return dict_mul(self._direction, disp_resp) def _calc_phase_advance_response(self): """ Response Matrix for delta DPhi. Eq. 28 in [#FranchiAnalyticformulasrapid2017]_ Reduced to only phase advances between consecutive elements, as the 3D-Matrix of all elements exceeds memory space (~11000^3 = 1331 Giga Elements) --> w = j-1: DPhi(z,j) = DPhi(x, (j-1)->j) """ LOG.debug("Calculate Phase Advance Response Matrix") with timeit(lambda t: LOG.debug(" Time needed: {:f}s".format(t))): tw = self._twiss adv = self._phase_advances k1_el = self._elements_in["K1L"] el_out_all = [DUMMY_ID] + self._elements_out # Add MU[XY] = 0.0 to the start el_out = el_out_all[1:] # in these we are actually interested el_out_mm = el_out_all[0:-1] # elements-- if len(k1_el) > 0: dmu = dict.fromkeys(["X", "Y"]) pi = tfs.TfsDataFrame(tw['S'][:, None] < tw['S'][None, :], # pi(i,j) = s(i) < s(j) index=tw.index, columns=tw.index, dtype=int) pi_term = (pi.loc[k1_el, el_out].values - pi.loc[k1_el, el_out_mm].values + np.diag(pi.loc[el_out, el_out_mm].values)[None, :]) for plane in ["X", "Y"]: col_beta = "BET" + plane q = tw.Q1 if plane == "X" else tw.Q2 coeff_sign = 1 if plane == "X" else -1 pi2tau = 2 * np.pi * tau(adv[plane].loc[k1_el, el_out_all], q) brackets = (2 * pi_term + ((np.sin(2 * pi2tau.loc[:, el_out].values) - np.sin(2 * pi2tau.loc[:, el_out_mm].values)) / np.sin(2 * np.pi * q) )) dmu[plane] = tfs.TfsDataFrame( tw.loc[k1_el, col_beta].values[:, None] * brackets * (coeff_sign / (8 * np.pi)), index=k1_el, columns=el_out).transpose() else: LOG.debug(" No 'K1L' variables found. Phase Response will be empty.") dmu = {"X": tfs.TfsDataFrame(None, index=el_out), "Y": tfs.TfsDataFrame(None, index=el_out)} return dict_mul(self._direction, dmu) def _calc_phase_response(self): """ Response Matrix for delta DPhi. Eq. 28 in [#FranchiAnalyticformulasrapid2017]_ Reduced to only delta phase. --> w = 0: DPhi(z,j) = DPhi(x, 0->j) This calculation could also be achieved by applying np.cumsum to the DataFrames of _calc_phase_adv_response() (tested!), but _calc_phase_response() is about 4x faster. """ LOG.debug("Calculate Phase Response Matrix") with timeit(lambda t: LOG.debug(" Time needed: {:f}s".format(t))): tw = self._twiss adv = self._phase_advances k1_el = self._elements_in["K1L"] el_out = self._elements_out if len(k1_el) > 0: dmu = dict.fromkeys(["X", "Y"]) pi = tfs.TfsDataFrame(tw['S'][:, None] < tw['S'][None, :], # pi(i,j) = s(i) < s(j) index=tw.index, columns=tw.index, dtype=int) pi_term = pi.loc[k1_el, el_out].values for plane in ["X", "Y"]: col_beta = "BET" + plane q = tw.Q1 if plane == "X" else tw.Q2 coeff_sign = 1 if plane == "X" else -1 pi2tau = 2 * np.pi * tau(adv[plane].loc[k1_el, [DUMMY_ID] + el_out], q) brackets = (2 * pi_term + ((np.sin(2 * pi2tau.loc[:, el_out].values) - np.sin(2 * pi2tau.loc[:, DUMMY_ID].values[:, None])) / np.sin(2 * np.pi * q) )) dmu[plane] = tfs.TfsDataFrame( tw.loc[k1_el, col_beta].values[:, None] * brackets * (coeff_sign / (8 * np.pi)), index=k1_el, columns=el_out).transpose() else: LOG.debug(" No 'K1L' variables found. Phase Response will be empty.") dmu = {"X": tfs.TfsDataFrame(None, index=el_out), "Y": tfs.TfsDataFrame(None, index=el_out)} return dict_mul(self._direction, dmu) def _calc_tune_response(self): """ Response vectors for Tune. Eq. 7 in [#TomasReviewlinearoptics2017]_ """ LOG.debug("Calculate Tune Response Matrix") with timeit(lambda t: LOG.debug(" Time needed: {:f}s".format(t))): tw = self._twiss k1_el = self._elements_in["K1L"] if len(k1_el) > 0: dtune = dict.fromkeys(["X", "Y"]) dtune["X"] = 1/(4 * np.pi) * tw.loc[k1_el, ["BETX"]].transpose() dtune["X"].index = ["DQX"] dtune["Y"] = -1 / (4 * np.pi) * tw.loc[k1_el, ["BETY"]].transpose() dtune["Y"].index = ["DQY"] else: LOG.debug(" No 'K1L' variables found. Tune Response will be empty.") dtune = {"X": tfs.TfsDataFrame(None, index=["DQX"]), "Y": tfs.TfsDataFrame(None, index=["DQY"])} return dict_mul(self._direction, dtune) ################################ # Normalizing ################################ def _normalize_beta_response(self, beta): """ Convert to Beta Beating """ el_out = self._elements_out tw = self._twiss beta_norm = dict.fromkeys(beta.keys()) for plane in beta: col = "BET" + plane beta_norm[plane] = beta[plane].div( tw.loc[el_out, col], axis='index') return beta_norm ################################ # Mapping ################################ def _map_dispersion_response(self, disp): """ Maps all dispersion matrices """ disp_mapped = dict.fromkeys(disp.keys()) m2v = self._map_to_variables for plane in disp: mapping = self._var_to_el[plane.split("_")[1]] disp_mapped[plane] = m2v(disp[plane], mapping) return disp_mapped @staticmethod def _map_to_variables(df, mapping): """ Maps from magnets to variables using self._var_to_el. Could actually be done by matrix multiplication :math:'A \cdot var_to_el', yet, as var_to_el is very sparsely populated, looping is easier. Args: df: DataFrame or dictionary of DataFrames to map mapping: mapping to be applied (e.g. var_to_el[order]) Returns: DataFrame or dictionary of mapped DataFrames """ def map_fun(df, mapping): """ Actual mapping function """ df_map = tfs.TfsDataFrame(index=df.index, columns=mapping.keys()) for var, magnets in mapping.iteritems(): df_map[var] = df.loc[:, upper(magnets.index)].mul( magnets.values, axis="columns" ).sum(axis="columns") return df_map # convenience wrapper for dicts if isinstance(df, dict): mapped = dict.fromkeys(df.keys()) for plane in mapped: mapped[plane] = map_fun(df[plane], mapping) else: mapped = map_fun(df, mapping) return mapped ################################ # Getters ################################
[docs] def get_beta(self, mapped=True): """ Returns Response Matrix for Beta """ if not self._beta: self._beta = self._calc_beta_response() if mapped and not self._beta_mapped: self._beta_mapped = self._map_to_variables(self._beta, self._var_to_el["K1L"]) if mapped: return self._beta_mapped else: return self._beta
[docs] def get_beta_beat(self, mapped=True): """ Returns Response Matrix for Beta Beating """ if not self._beta: self._beta = self._calc_beta_response() if not self._beta_beat: self._beta_beat = self._normalize_beta_response(self._beta) if mapped and not self._beta_beat_mapped: self._beta_beat_mapped = self._map_to_variables(self._beta_beat, self._var_to_el["K1L"]) if mapped: return self._beta_beat_mapped else: return self._beta_beat
[docs] def get_dispersion(self, mapped=True): """ Returns Response Matrix for Dispersion """ if not self._dispersion: self._dispersion = self._calc_dispersion_response() if mapped and not self._dispersion_mapped: self._dispersion_mapped = self._map_dispersion_response(self._dispersion) if mapped: return self._dispersion_mapped else: return self._dispersion
[docs] def get_norm_dispersion(self, mapped=True): """ Returns Response Matrix for Normalized Dispersion """ if not self._norm_dispersion: self._norm_dispersion = self._calc_norm_dispersion_response() if mapped and not self._norm_dispersion_mapped: self._norm_dispersion_mapped = self._map_dispersion_response(self._norm_dispersion) if mapped: return self._norm_dispersion_mapped else: return self._norm_dispersion
[docs] def get_phase(self, mapped=True): """ Returns Response Matrix for Total Phase """ if not self._phase: self._phase = self._calc_phase_response() if mapped and not self._phase_mapped: self._phase_mapped = self._map_to_variables(self._phase, self._var_to_el["K1L"]) if mapped: return self._phase_mapped else: return self._phase
[docs] def get_phase_adv(self, mapped=True): """ Returns Response Matrix for Phase Advance """ if not self._phase_adv: self._phase_adv = self._calc_phase_advance_response() if mapped and not self._phase_adv_mapped: self._phase_adv_mapped = self._map_to_variables(self._phase_adv, self._var_to_el["K1L"]) if mapped: return self._phase_adv_mapped else: return self._phase_adv
[docs] def get_tune(self, mapped=True): """ Returns Response Matrix for the Tunes """ if not self._tune: self._tune = self._calc_tune_response() if mapped and not self._tune_mapped: self._tune_mapped = self._map_to_variables(self._tune, self._var_to_el["K1L"]) if mapped: return self._tune_mapped else: return self._tune
[docs] def get_coupling(self, mapped=True): """ Returns Response Matrix for the coupling """ if not self._coupling: self._coupling = self._calc_coupling_response() if mapped and not self._coupling_mapped: self._coupling_mapped = self._map_to_variables(self._coupling, self._var_to_el["K1SL"]) if mapped: return self._coupling_mapped else: return self._coupling
def get_variable_names(self): return self._variables def get_variable_mapping(self, order=None): if order is None: return self._var_to_el else: return self._var_to_el[order]
[docs] def get_response_for(self, obs=None): """ Calculates and returns only desired response matrices """ # calling functions for the getters to call functions only if needed def caller(func, plane): return func()[plane] def disp_caller(func, plane): disp = func() return response_add(*[disp[k] for k in disp.keys() if k.startswith(plane)]) def tune_caller(func, _unused): tune = func() res = tune["X"].append(tune["Y"]) res.index = ["Q1", "Q2"] return res def couple_caller(func, plane): # apply() converts empty DataFrames to Series! Cast them back. # Also: take care of minus-sign convention! sign = -1 if plane[-1] == "R" else 1 part_func = np.real if plane[-1] == "R" else np.imag return sign * tfs.TfsDataFrame(func()[plane[:-1]].apply(part_func).astype(np.float64)) # to avoid if-elif-elif-... obs_map = { 'Q': (tune_caller, self.get_tune, None), 'BETX': (caller, self.get_beta, "X"), 'BETY': (caller, self.get_beta, "Y"), 'BBX': (caller, self.get_beta_beat, "X"), 'BBY': (caller, self.get_beta_beat, "Y"), 'MUX': (caller, self.get_phase, "X"), 'MUY': (caller, self.get_phase, "Y"), 'DX': (disp_caller, self.get_dispersion, "X"), 'DY': (disp_caller, self.get_dispersion, "Y"), 'NDX': (disp_caller, self.get_norm_dispersion, "X"), 'NDY': (disp_caller, self.get_norm_dispersion, "Y"), 'F1001R': (couple_caller, self.get_coupling, "1001R"), 'F1001I': (couple_caller, self.get_coupling, "1001I"), 'F1010R': (couple_caller, self.get_coupling, "1010R"), 'F1010I': (couple_caller, self.get_coupling, "1010I"), } if obs is None: obs = obs_map.keys() LOG.debug("Calculating responses for {:s}.".format(obs)) with timeit(lambda t: LOG.debug("Total time getting responses: {:f}s".format(t))): response = dict.fromkeys(obs) for key in obs: response[key] = obs_map[key][0](*obs_map[key][1:3]) return response
# Associated Functions #########################################################
[docs]def get_delta(response, delta_k): """ Returns the deltas of :math:`response_matrix \cdot delta_k`. Args: response: Response dictionary delta_k: Pandas Series of variables and their delta-value Returns: TFS_DataFrame with elements as indices and the calculated deltas in the columns """ delta_df = tfs.TfsDataFrame(None, index=response.index) for col in response.keys(): # equivalent to .dot() but more efficient as delta_k is "sparse" if col == "Q": try: delta_q = (response[col].loc[:, delta_k.index] * delta_k ).dropna(axis="columns").sum(axis="columns") except KeyError: # none of the delta_k are in DataFrame delta_q = pd.Series([0., 0.], index=["Q1", "Q2"]) delta_df.headers["QX"] = delta_q["Q1"] delta_df.headers["QY"] = delta_q["Q2"] else: try: delta_df.loc[:, col] = (response[col].loc[:, delta_k.index] * delta_k ).dropna(axis="columns").sum(axis="columns") except KeyError: # none of the delta_k are in DataFrame delta_df.loc[:, col] = 0. return delta_df
[docs]def response_add(*args): """ Merges two or more Response Matrix DataFrames """ base_df = args[0] for df in args[1:]: base_df = base_df.add(df, fill_value=0.) return base_df
[docs]def dict_mul(number, dictionary): """ Multiply an int with a dict of dataframes (or anything multiplyable) """ if number != 1: for key in dictionary: dictionary[key] = number * dictionary[key] return dictionary
# Wrapper ##################################################################
[docs]def create_response(accel_inst, vars_categories, optics_params): """ Wrapper to create response via TwissResponse """ LOG.debug("Creating response via TwissResponse.") varmap_path = check_varmap_file(accel_inst, vars_categories) with timeit(lambda t: LOG.debug("Total time getting TwissResponse: {:f}s".format(t))): tr = TwissResponse(accel_inst, vars_categories, varmap_path) response = tr.get_response_for(optics_params) if not any([resp.size for resp in response.values()]): raise ValueError("Responses are all empty. " + "Are variables {:s} ".format(tr.get_variable_names()) + "correct for '{:s}'?".format(optics_params) ) return response
# Script Mode ################################################################## if __name__ == '__main__': raise EnvironmentError("{:s} is not supposed to run as main.".format(__file__))