Source code for omc3.optics_measurements.rdt

"""
Resonance Driving Terms
-----------------------

This module contains RDT calculations related functionality of ``optics_measurements``.
It provides functions to compute global resonance driving terms **f_jklm**.
"""
from __future__ import annotations

from contextlib import contextmanager
from copy import deepcopy
from pathlib import Path
from typing import TYPE_CHECKING, Any

import numpy as np
import pandas as pd
import tfs
from optics_functions.rdt import get_all_to_order, jklm2str
from scipy.optimize import OptimizeWarning, curve_fit
from scipy.sparse import diags

from omc3.definitions.constants import PLANES
from omc3.harpy.constants import COL_AMP, COL_PHASE
from omc3.optics_measurements.constants import (
    AMPLITUDE,
    COUNT,
    ERR,
    EXT,
    IMAG,
    MEASUREMENT,
    NAME,
    NAME2,
    PHASE,
    PHASE_ADV,
    REAL,
    RDT_FOLDER,
    S2,
    S,
)
from omc3.optics_measurements.data_models import (
    InputFiles,
    check_and_warn_about_offmomentum_data,
    filter_for_dpp,
)
from omc3.optics_measurements.phase import (
    COMPENSATED,
    UNCOMPENSATED,
    CompensationMode,
    PhaseDict,
)
from omc3.optics_measurements.toolbox import df_diff
from omc3.optics_measurements.tune import TuneDict
from omc3.utils import iotools, logging_tools, stats

if TYPE_CHECKING: 
    from generic_parser import DotDict
    from numpy.typing import ArrayLike

NBPMS_FOR_90 = 3
LOGGER = logging_tools.get_logger(__name__)

RDTTuple = tuple[int, int, int, int]
LineTuple = tuple[int, int, int]


def _generate_plane_rdts(order: int) -> tuple[dict[str, list[RDTTuple]], dict[str, list[RDTTuple]]]:
    """
    This helper function generates two dictionnaries representing on what plane(s)
    a RDT can be seen and which tune it is a multiple of.

    For a given j+k+l+m = n multiple order, a line can be seen:
      - on the horizontal axis if j != 0, at:
          (1 - j + k) Qx + (m - l) Qy
      - on the vertical axis if l != 0, at:
          (k - j) Qx + (1 - l + m) Qy

    Reference equations are (3.27) and (3.28) in
    [Andrea Franchi's Thesis](https://repository.gsi.de/record/55413/files/GSI-Diss-2006-07.pdf)
    """
    # Get all the valid RDTs up to a certain order
    all_rdts = get_all_to_order(order)

    single_plane = {'X': [], 'Y': []}
    double_plane = {'X': [], 'Y': []}
    # Iterate through our RDTs and classify them depending on what plane they act
    for (j,k,l,m) in all_rdts:  # noqa: E741
        if j == 0 and l == 0:  # the RDT can't be seen on any plane
            continue
        if l+m == 0 and j != 0:  # The line where the RDT is seen is a multiple of the Qx line
            single_plane['X'].append((j,k,l,m))  # e.g. f1400, f3000, f4000
        elif j+k == 0 and l != 0:  # same, but for the Qy line
            single_plane['Y'].append((j,k,l,m))  # e.g. f0030,f0040
        elif j == 0:  # The RDT can only be seen on the vertical plane and uses both Qx and Qy
            double_plane['Y'].append((j,k,l,m))  # e.g. f0111, f0120
        elif l == 0: # same, but for the horizontal plane
            double_plane['X'].append((j,k,l,m))  # e.g. f1001, f1002
        else:  # The RDT can be seen on both planes and is a multiple of both Qx and Qy
            double_plane['X'].append((j,k,l,m))  # e.g. f1020, f1120
            double_plane['Y'].append((j,k,l,m))

    return single_plane, double_plane


[docs] def calculate( measure_input: DotDict, input_files: InputFiles, tunes: TuneDict, phases: dict[str, PhaseDict], invariants: dict[str, pd.DataFrame], header: dict, ) -> None: """ Computes the RDTs for the given input files and settings up to the magnet order given in the inputs, and writes the results to file. Args: measure_input: `OpticsInput` object containing analysis settings. input_files: `InputFiles` object containing frequency spectra files (linx/y). tunes: `TuneDict` object mapping planes to tunes, as given by :func:`omc3.optics_measurements.tune.calculate`. phases: invariants (dict[str, pd.DataFrame]): dictionnary mapping planes to dataframes of actions/errors per kick, e.g. from :func:`omc3.optics_measurements.kick.calculate`. header: headers to include to the written result files. """ LOGGER.info("Start of RDT analysis") meas_input = deepcopy(measure_input) meas_input["compensation"] = CompensationMode.NONE LOGGER.info(f"Calculating RDTs up to magnet order {meas_input['rdt_magnet_order']}") dpp_value = meas_input.analyse_dpp if dpp_value is None: for plane in PLANES: check_and_warn_about_offmomentum_data(input_files, plane, id_="RDT calculation") else: invariants = filter_for_dpp(invariants, input_files, dpp_value) single_plane_rdts, double_plane_rdts = _generate_plane_rdts(meas_input["rdt_magnet_order"]) single, double = "Single", "Double" for rdts, plane_str in ((single_plane_rdts, single), (double_plane_rdts, double)): for plane in PLANES: bpm_names = input_files.bpms( plane=plane if (plane_str == single) else None, dpp_value=dpp_value ) phases_for_rdts = _best_90_degree_phases(meas_input, bpm_names, phases, tunes, plane) LOGGER.info( f"{plane_str} Plane RDTs: " f"Average phase advance between BPM pairs: {phases_for_rdts.loc[:, MEASUREMENT].mean()}" ) for rdt in rdts[plane]: with _check_amp_error(rdt): df = _process_rdt(meas_input, input_files, phases_for_rdts, invariants, plane, rdt) write(df, add_freq_to_header(header, plane, rdt), meas_input, plane, rdt)
def write(df: pd.DataFrame, header: dict[str, Any], meas_input: DotDict, plane: str, rdt: RDTTuple): outputdir = Path(meas_input.outputdir) / RDT_FOLDER / _rdt_to_order_and_type(rdt) iotools.create_dirs(outputdir) tfs.write(outputdir / f"f{_rdt_to_str(rdt)}_{plane.lower()}{EXT}", df, header, save_index=NAME) @contextmanager def _check_amp_error(rdt: RDTTuple): """ Context manager to catch a specific type of ValueError, regarding the AMP_ column not found in the output, as raised by get_line_sign_and_suffix. Alternatively, we could just raise a custom Error Type? """ try: yield except ValueError as e: error_str = str(e) message = f"RDT calculation failed for {jklm2str(*rdt)}." if "column AMP" not in error_str: # raise unexpected Value errors raise ValueError(message) from e LOGGER.warning(f"{message}: {error_str}") def _rdt_to_str(rdt: RDTTuple): j, k, l, m = rdt # noqa: E741 return f"{j}{k}{l}{m}" def _rdt_to_order_and_type(rdt: RDTTuple): j, k, l, m = rdt # noqa: E741 rdt_type = "normal" if (l + m) % 2 == 0 else "skew" orders = dict(((1, "dipole"), (2, "quadrupole"), (3, "sextupole"), (4, "octupole"), (5, "decapole"), (6, "dodecapole"), (7, "tetradecapole"), (8, "hexadecapole"), )) return f"{rdt_type}_{orders[j + k + l + m]}" def _best_90_degree_phases(meas_input, bpm_names, phases, tunes, plane): phase_dict: PhaseDict = phases[plane][UNCOMPENSATED] if phase_dict is None: phase_dict = phases[plane][COMPENSATED] bpm_names = phase_dict[MEASUREMENT].index.intersection(bpm_names) # removes BPMs that are not in model filtered = phase_dict[MEASUREMENT].loc[bpm_names, bpm_names] phase_meas = pd.concat((filtered % 1, (filtered.iloc[:, :NBPMS_FOR_90] + tunes[plane]["Q"]) % 1), axis=1) second_bmps = np.abs(phase_meas * _get_n_upper_diagonals(NBPMS_FOR_90, phase_meas.shape) - 0.25).idxmin(axis=1) filtered.iloc[-NBPMS_FOR_90:,:NBPMS_FOR_90] = (filtered.iloc[-NBPMS_FOR_90:,:NBPMS_FOR_90] + tunes[plane]["Q"]) % 1 # get the pairs zip(bpm_names, second_bpms) filtered[NAME2] = second_bmps filtered[MEASUREMENT] = np.diag(filtered.loc[bpm_names, second_bmps]) filtered[f"{ERR}{MEASUREMENT}"] = np.diag(phase_dict[f"{ERR}{MEASUREMENT}"].loc[bpm_names, second_bmps]) # Merge final dataframe model: pd.DataFrame = meas_input.accelerator.model for_rdts = pd.merge(filtered.loc[:, [NAME2, MEASUREMENT, f"{ERR}{MEASUREMENT}"]], model.loc[:, [S]], how="inner", left_index=True, right_index=True) return for_rdts def _get_n_upper_diagonals(n, shape): return diags(np.ones((n, shape[0])), np.arange(n)+1, shape=shape).toarray() def _determine_line(rdt: RDTTuple, plane: str) -> dict[str, LineTuple]: j, k, l, m = rdt # noqa: E741 lines = dict(X=(1 - j + k, m - l, 0), Y=(k - j, 1 - l + m, 0)) return lines[plane] def add_freq_to_header(header: dict[str, Any], plane: str, rdt: RDTTuple): mod_header = header.copy() line = _determine_line(rdt, plane) freq = np.mod(line@np.array([header['Q1'], header['Q2'], 0]), 1) mod_header["FREQ"] = freq if freq <= 0.5 else 1 - freq return mod_header def _process_rdt(meas_input: DotDict, input_files: InputFiles, phase_data, invariants, plane, rdt: RDTTuple): dpp_value = meas_input.analyse_dpp df = pd.DataFrame(phase_data) second_bpms = df.loc[:, NAME2].to_numpy() df[S2] = df.loc[second_bpms, S].to_numpy() df[COUNT] = len(input_files.dpp_frames(plane, dpp_value=dpp_value)) line = _determine_line(rdt, plane) phase_sign, suffix = get_line_sign_and_suffix(line, input_files, plane) df_all_amps = input_files.joined_frame(plane, [f"{COL_AMP}{suffix}"], dpp_value=dpp_value) df_all_phases = phase_sign * input_files.joined_frame(plane, [f"{COL_PHASE}{suffix}"], dpp_value=dpp_value) comp_coeffs1 = to_complex(df_all_amps.loc[df.index, :], df_all_phases.loc[df.index, :]) # Multiples of tunes needs to be added to phase at second BPM if that is in second turn comp_coeffs2 = to_complex( df_all_amps.loc[second_bpms, :], _add_tunes_if_in_second_turn( df, input_files, line, df_all_phases.loc[second_bpms, :].to_numpy(), dpp_value ) ) # Get amplitude and phase of the line from linx/liny file line_amp, line_phase, line_amp_e, line_phase_e = complex_secondary_lines( # TODO use the errors df.loc[:, MEASUREMENT].to_numpy()[:, np.newaxis] * meas_input.accelerator.beam_direction, df.loc[:, f"{ERR}{MEASUREMENT}"].to_numpy()[:, np.newaxis], comp_coeffs1, comp_coeffs2) rdt_phases_per_file = _calculate_rdt_phases_from_line_phases(df, input_files, line, line_phase, dpp_value) rdt_angles = stats.circular_mean(rdt_phases_per_file, period=1, axis=1) % 1 df[PHASE] = rdt_angles df[f"{ERR}{PHASE}"] = stats.circular_error(rdt_phases_per_file, period=1, axis=1) df[AMPLITUDE], df[f"{ERR}{AMPLITUDE}"] = _fit_rdt_amplitudes(invariants, line_amp, plane, rdt) df[REAL] = np.cos(2 * np.pi * rdt_angles) * df.loc[:, AMPLITUDE].to_numpy() df[IMAG] = np.sin(2 * np.pi * rdt_angles) * df.loc[:, AMPLITUDE].to_numpy() # in old files there was "EAMP" and "PHASE_STD" return df.loc[:, [S, COUNT, AMPLITUDE, f"{ERR}{AMPLITUDE}", PHASE, f"{ERR}{PHASE}", REAL, IMAG]] def _add_tunes_if_in_second_turn(df: pd.DataFrame, input_files: InputFiles, line, phase2, dpp_value): mask = df_diff(df, S, S2) > 0 tunes = np.empty((2, len(input_files.dpp_frames("X", dpp_value)))) for i, plane in enumerate(PLANES): tunes[i] = np.array([lin.headers[f"Q{i+1}"] for lin in input_files.dpp_frames(plane, dpp_value)]) phase2[mask, :] = phase2[mask, :] + line[0] * tunes[0] + line[1] * tunes[1] return phase2 def _calculate_rdt_phases_from_line_phases(df, input_files, line, line_phase, dpp_value): phases = np.zeros((2, df.index.size, len(input_files.dpp_frames("X", dpp_value)))) for i, plane in enumerate(PLANES): if line[i] != 0: phases[i] = input_files.joined_frame(plane, [f"{PHASE_ADV}{plane}"], dpp_value=dpp_value).loc[df.index, :].to_numpy() % 1 return line_phase - line[0] * phases[0] - line[1] * phases[1] + 0.25 def _fit_rdt_amplitudes(invariants, line_amp, plane, rdt): """ Returns RDT amplitudes in units of meters ^ {1 - n/2}, where n is the order of RDT. """ import warnings amps, err_amps = np.empty(line_amp.shape[0]), np.empty(line_amp.shape[0]) kick_data = get_linearized_problem(invariants, plane, rdt) # corresponding to actions in meters guess = np.mean(line_amp / kick_data, axis=1) def fitting(x, f): return f * x for i, bpm_rdt_data in enumerate(line_amp): # Catch the potential 'OptimizeWarning: Covariance of the parameters could not be estimated' # that can happen during the curve_fit. We relay it as a logged message, which allows us to # avoid polluting the stderr and allows the user to not see it depending on log level with warnings.catch_warnings(record=True) as records: warnings.simplefilter("ignore", OptimizeWarning) popt, pcov = curve_fit(fitting, kick_data, bpm_rdt_data, p0=guess[i]) amps[i] = popt[0] sqrt_pcov = np.sqrt(pcov).flat[0] err_amps[i] = sqrt_pcov if np.isfinite(sqrt_pcov) else 0. # if single file is used, the error is reported as Inf, which is then overwritten with 0 # We log any captured warning at warning level for warning in records: LOGGER.warning(f"Curve fit warning: {warning.message}") return amps, err_amps
[docs] def get_linearized_problem(invs: dict[str, np.ndarray], plane: str, rdt: RDTTuple): """ 2 * j * f_jklm * (powers of 2Jx and 2Jy) : f_jklm is later a parameter of a fit we use sqrt(2J): unit is sqrt(m). """ j, k, l, m = rdt # noqa: E741 act_x = invs["X"].T[0] act_y = invs["Y"].T[0] if plane == "X": return 2 * j * act_x ** (j + k - 2) * act_y ** (l + m) return 2 * l * act_x ** (j + k) * act_y ** (l + m - 2)
def get_line_sign_and_suffix(line: LineTuple, input_files: InputFiles, plane: str): suffix = f"{line[0]}{line[1]}".replace("-", "_") conj_suffix = f"{-line[0]}{-line[1]}".replace("-", "_") if f"{COL_AMP}{suffix}" in input_files[plane][0].columns: return 1, suffix if f"{COL_AMP}{conj_suffix}" in input_files[plane][0].columns: return -1, conj_suffix # The specified AMP column hasn't been found in the lin file msg = (f"The column AMP{suffix} has not been found in the lin{plane.lower()} file. " "Consider re-running the frequency analysis with a higher order resonance term") raise ValueError(msg)
[docs] def complex_secondary_lines(phase_adv: ArrayLike[float], err_padv: ArrayLike[float], sig1: ArrayLike[complex], sig2: ArrayLike[complex]): """ Args: phase_adv: phase advances between two BPMs. err_padv: error on the phase advance between two BPMs. sig1: Complex coefficients of a secondary lines at the first BPM of the pairs. sig2: Complex coefficients of a secondary lines at the second BPM of the pairs. Returns: `Tuple` with amplitudes, phases err_amplitudes and err_phases of the complex signal. """ tp = 2.0 * np.pi # computing complex secondary line (h-) sig = (sig1 * (1 + 1j / np.tan(phase_adv * tp)) - sig2 * 1j / np.sin(phase_adv * tp)) / 2 # computing error secondary line (h-) # TODO is this really the error? esig = (sig1 * 1j / np.square(np.sin(phase_adv * tp)) + sig2 * -1j * np.cos(phase_adv * tp) / np.square(np.sin(phase_adv * tp))) * err_padv / 2 return (np.abs(sig), (np.angle(sig) / tp) % 1, np.abs(esig), (np.angle(esig) / tp) % 1)
def to_complex(amplitudes: ArrayLike, phases: ArrayLike, period: float = 1): try: amplitudes = amplitudes.to_numpy() except AttributeError: pass try: phases = phases.to_numpy() except AttributeError: pass return amplitudes * np.exp(2j * np.pi * phases / period)