Source code for omc3.optics_measurements.phase

"""
Phase Advance
-------------

This module contains phase calculation functionality of ``optics_measurements``.
It provides functions to compute betatron phase advances and structures to store them.
"""
from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING, Any, TypedDict

import numpy as np
import pandas as pd
import tfs

from omc3.optics_measurements.constants import (
    DELTA,
    DRIVEN_PHASE_NAME,
    DRIVEN_TOTAL_PHASE_NAME,
    ERR,
    EXT,
    MDL,
    MEASUREMENT,
    MODEL,
    NAME,
    PHASE,
    PHASE_ADV,
    PHASE_NAME,
    SPECIAL_PHASE_NAME,
    TOTAL_PHASE_NAME,
    S,
)
from omc3.optics_measurements.data_models import (
    InputFiles,
    check_and_warn_about_offmomentum_data,
)
from omc3.optics_measurements.toolbox import ang_sum, df_ang_diff, df_diff
from omc3.utils import logging_tools, stats

if TYPE_CHECKING: 
    from collections.abc import Sequence

    from generic_parser import DotDict
    from numpy.typing import ArrayLike

    from omc3.model.accelerators.accelerator import Accelerator
    from omc3.optics_measurements.tune import TuneDict

PhaseDict = TypedDict(
    'PhaseDict', {
        MODEL: pd.DataFrame, 
        MEASUREMENT: pd.DataFrame, 
        f'{ERR}{MEASUREMENT}': pd.DataFrame
    }
)
LOGGER = logging_tools.get_logger(__name__)

class CompensationMode:
    NONE: str = "none"
    MODEL: str = "model"
    EQUATION: str = "equation"
    
    @classmethod
    def all(cls) -> list[str]:
        return [cls.NONE, cls.MODEL, cls.EQUATION]

COMPENSATED: str = "compensated"  # former 'free'
UNCOMPENSATED: str = "uncompensated"  # former 'driven'

[docs] def calculate( meas_input: DotDict, input_files: InputFiles, tunes: TuneDict, plane: str, no_errors: bool = False, ) -> tuple[dict[str, PhaseDict], list[pd.DataFrame]]: """ Calculate phases for 'compensated' (aka 'free') and 'uncompensated' (aka 'driven') cases from the measurement files, and return a dictionary combining the results for each transverse plane. Args: meas_input (DotDict): `OpticsInput` object containing analysis settings from the command-line. input_files (InputFiles): `InputFiles` object containing frequency spectra files (linx/y). tunes (TuneDict): `TuneDict` contains measured tunes. plane (str): marking the horizontal or vertical plane, **X** or **Y**. no_errors (bool): if ``True``, measured errors shall not be propagated (only their spread). Returns: A tuple of a dictionary and a list of corresponding phase DataFrames. The dictionary contains the compensated and uncompensated results in `PhaseDict` form, i.e. a dictionary of DataFrames with the phase models, measured and errors. The list contains the DataFrames with the phase advances between BPMs and the total phase advances, for both compensated and uncompensated cases. """ # Clean up compensation mode if meas_input.compensation is None: meas_input.compensation = CompensationMode.NONE else: meas_input.compensation = meas_input.compensation.lower() LOGGER.info("-- Run phase calculation with requested compensation") phase_advances, dfs = _calculate_phase_advances( meas_input=meas_input, input_files=input_files, tunes=tunes, plane=plane, model_df=meas_input.accelerator.model, compensation=meas_input.compensation, no_errors=no_errors, ) if meas_input.compensation == CompensationMode.NONE: uncompensated_phase_advances = None else: LOGGER.info("-- Run phase calculation without compensation") uncompensated_phase_advances, drv_dfs = _calculate_phase_advances( meas_input=meas_input, input_files=input_files, tunes=tunes, plane=plane, model_df=meas_input.accelerator.model_driven, compensation=CompensationMode.NONE, no_errors=no_errors, ) dfs = dfs + drv_dfs if len(phase_advances[MEASUREMENT].index) < 3: LOGGER.warning( "Less than 3 non-NaN phase-advances found. " "This will most likely lead to errors later on in the N-BPM or 3-BPM methods.\n" "Common issues to check:\n" "- Did you pass the correct tunes to harpy? Possibly also too small tolerance window?\n" "- Did excitation trigger in both planes? BPMs might be cleaned if only found in one plane.\n" "- Are cleaning settings (peak-to-peak, singular-value-cut) too agressive?\n" "- Are you using a machine with less than 3 BPMs? Oh dear." ) return {COMPENSATED: phase_advances, UNCOMPENSATED: uncompensated_phase_advances}, dfs
def _calculate_phase_advances( meas_input: DotDict, input_files: InputFiles, tunes: TuneDict, plane: str, model_df: pd.DataFrame, compensation: str, no_errors: bool, ) -> tuple[PhaseDict, list[pd.DataFrame, pd.DataFrame]]: """ Calculates phase advances. Args: meas_input: the input object including settings and the accelerator class. input_files: includes measurement tfs. tunes: `TunesDict` object containing measured and model tunes and ac2bpm object plane: marking the horizontal or vertical plane, **X** or **Y**. compensation: compensation mode to use no_errors: if ``True``, measured errors shall not be propagated (only their spread). Returns: A tuple of - a `dictionary` of `TfsDataFrames` indexed (BPMi x BPMj) yielding phase advance `phi_ij`. - "MEAS": measured phase advances, - "ERRMEAS": errors of measured phase advances, - "MODEL": model phase advances. +------++--------+--------+--------+--------+ | || BPM1 | BPM2 | BPM3 | BPM4 | +======++========+========+========+========+ | BPM1 || 0 | phi_12 | phi_13 | phi_14 | +------++--------+--------+--------+--------+ | BPM2 || phi_21 | 0 | phi_23 | phi_24 | +------++--------+--------+--------+--------+ | BPM3 || phi_31 | phi_32 | 0 | phi_34 | +------++--------+--------+--------+--------+ | BPM4 || phi_41 | phi_42 | phi_43 | 0 | +------++--------+--------+--------+--------+ The phase advance between BPM_i and BPM_j can be obtained via: phase_advances["MEAS"].loc[BPMi,BPMj] - a list of output data frames (for tfs-files to be written out); the first one contains the phase advances between the BPMs, the second one contains the total phase advances. """ LOGGER.info("Calculating phase advances") LOGGER.info(f"Measured tune in plane {plane} = {tunes[plane]['Q']}") df = model_df.loc[:, [S, f"{PHASE_ADV}{plane}"]] how = 'outer' if meas_input.union else 'inner' dpp_value = meas_input.analyse_dpp if dpp_value is None: check_and_warn_about_offmomentum_data(input_files, plane, id_="Phase calculations") joined_df = input_files.joined_frame( plane, [f"{PHASE_ADV}{plane}", f"{ERR}{PHASE_ADV}{plane}"], dpp_value=dpp_value, how=how ) df = pd.merge(df, joined_df, how='inner', left_index=True, right_index=True) direction = meas_input.accelerator.beam_direction df[input_files.get_columns(df, f"{PHASE_ADV}{plane}")] = direction * input_files.get_data(df, f"{PHASE_ADV}{plane}") phases_mdl = df.loc[:, f"{PHASE_ADV}{plane}"].to_numpy() phase_advances = {MODEL: _get_square_data_frame(_get_all_phase_diff(phases_mdl), df.index)} if compensation == CompensationMode.MODEL: df = _compensate_by_model(input_files, meas_input, df, plane) phases_meas = input_files.get_data(df, f"{PHASE_ADV}{plane}") if compensation == CompensationMode.EQUATION: phases_meas = _compensate_by_equation(phases_meas, plane, tunes) phases_errors = input_files.get_data(df, f"{ERR}{PHASE_ADV}{plane}") if phases_meas.ndim < 2: phase_advances[MEASUREMENT] = _get_square_data_frame(_get_all_phase_diff(phases_meas), df.index) phase_advances[f"{ERR}{MEASUREMENT}"] = _get_square_data_frame( np.zeros((len(phases_meas), len(phases_meas))), df.index) return phase_advances if meas_input.union: mask = np.isnan(phases_meas) phases_meas[mask], phases_errors[mask] = 0.0, np.inf if no_errors: phases_errors[~mask] = 1e-10 elif no_errors: phases_errors = None phases_3d = phases_meas[np.newaxis, :, :] - phases_meas[:, np.newaxis, :] if phases_errors is not None: errors_3d = phases_errors[np.newaxis, :, :] + phases_errors[:, np.newaxis, :] else: errors_3d = None phase_advances[MEASUREMENT] = _get_square_data_frame(stats.circular_mean( phases_3d, period=1, errors=errors_3d, axis=2) % 1.0, df.index) phase_advances[f"{ERR}{MEASUREMENT}"] = _get_square_data_frame(stats.circular_error( phases_3d, period=1, errors=errors_3d, axis=2), df.index) return phase_advances, [_create_output_df(phase_advances, df, plane), _create_output_df(phase_advances, df, plane, tot=True)] def _compensate_by_equation(phases_meas: ArrayLike, plane: str, tunes: TuneDict) -> ArrayLike: """ Compensate the measured phases by equation. TODO: Reference! """ driven_tune, free_tune, ac2bpmac = tunes[plane]["Q"], tunes[plane]["QF"], tunes[plane]["ac2bpm"] k_bpmac = ac2bpmac[2] phase_corr = ac2bpmac[1] - phases_meas[k_bpmac] + (0.5 * driven_tune) phases_meas = phases_meas + phase_corr[np.newaxis, :] r = tunes.get_lambda(plane) phases_meas[k_bpmac:, :] = phases_meas[k_bpmac:, :] - driven_tune psi = (np.arctan((1 - r) / (1 + r) * np.tan(2 * np.pi * phases_meas)) / (2 * np.pi)) % 0.5 phases_meas = np.where(phases_meas % 1.0 > 0.5, psi + .5, psi) phases_meas[k_bpmac:, :] = phases_meas[k_bpmac:, :] + free_tune return phases_meas def _compensate_by_model(input_files: InputFiles, meas_input: DotDict, df: pd.DataFrame, plane: str) -> pd.DataFrame: """ Compensate the measured phase advances by comparing the driven model with the used model.""" # add column from driven model to df (and merge indices) df_driven = pd.DataFrame(meas_input.accelerator.model_driven.loc[:, [f"{PHASE_ADV}{plane}"]]) df = pd.merge(df, df_driven, how='inner', left_index=True, right_index=True, suffixes=("", "comp")) # phase compensation = difference between driven model and used model phase_compensation = df_diff(df, f"{PHASE_ADV}{plane}", f"{PHASE_ADV}{plane}comp") # add the compensated phase to the measured data df[input_files.get_columns(df, f"{PHASE_ADV}{plane}")] = ang_sum( input_files.get_data(df, f"{PHASE_ADV}{plane}"), phase_compensation[:, np.newaxis]) return df
[docs] def write( dfs: Sequence[pd.DataFrame], headers: Sequence[dict[str, Any]] | dict[str, Any], output: str | Path, plane: str ): """ Write out the phase advance data into TFS-files.""" LOGGER.info(f"Writing phases: {len(dfs)}") if isinstance(headers, dict): headers = [headers] * len(dfs) for head, df, name in zip(headers, dfs, (PHASE_NAME, TOTAL_PHASE_NAME, DRIVEN_PHASE_NAME, DRIVEN_TOTAL_PHASE_NAME)): tfs.write(Path(output) / f"{name}{plane.lower()}{EXT}", df, head) LOGGER.info(f"Phase advance beating in {name}{plane.lower()}{EXT} = " f"{stats.weighted_rms(df.loc[:, f'{DELTA}{PHASE}{plane}'])}")
def _create_output_df(phase_advances: PhaseDict, model: pd.DataFrame, plane: str, tot: bool = False) -> pd.DataFrame: """ Create the DataFrames to be written out into TFS-files. Args: phase_advances (PhaseDict): Dictionary of phase-advance DataFrames as described in :func:`calculate_with_compensation`. model (pd.DataFrame): Model DataFrame (i.e. columns from twiss_elements + compensation). plane (str): Plane we are using ("X" or "Y"). tot (bool, optional): Create DataFrame for the total phase advance or BPM phase advances. Defaults to False. Returns: pd.DataFrame: Frame as would be expected in the output files (the .tfs to be written out). """ meas = phase_advances[MEASUREMENT] mod = phase_advances[MODEL] err = phase_advances[f"{ERR}{MEASUREMENT}"] if tot: output_data = model.loc[:, [S, f"{PHASE_ADV}{plane}"]].iloc[:, :] output_data[NAME] = output_data.index output_data = output_data.assign(S2=model.at[model.index[0], S], NAME2=model.index[0]) output_data[f"{PHASE}{plane}"] = meas.to_numpy()[0, :] output_data[f"{ERR}{PHASE}{plane}"] = err.to_numpy()[0, :] output_data[f"{PHASE}{plane}{MDL}"] = mod.to_numpy()[0, :] else: output_data = model.loc[:, [S, f"{PHASE_ADV}{plane}"]].iloc[:-1, :] output_data[NAME] = output_data.index output_data = output_data.assign(S2=model.loc[:, S].to_numpy()[1:], NAME2=model.index[1:].to_numpy()) output_data[f"{PHASE}{plane}"] = np.diag(meas.to_numpy(), k=1) output_data[f"{ERR}{PHASE}{plane}"] = np.diag(err.to_numpy(), k=1) output_data[f"{PHASE}{plane}{MDL}"] = np.diag(mod.to_numpy(), k=1) output_data.rename(columns={f"{PHASE_ADV}{plane}": f"{PHASE_ADV}{plane}{MDL}"}, inplace=True) output_data[f"{DELTA}{PHASE}{plane}"] = df_ang_diff(output_data, f"{PHASE}{plane}", f"{PHASE}{plane}{MDL}") output_data[f"{ERR}{DELTA}{PHASE}{plane}"] = output_data.loc[:, f"{ERR}{PHASE}{plane}"].to_numpy() return output_data def _get_all_phase_diff(phases_a: ArrayLike, phases_b: ArrayLike = None) -> ArrayLike: if phases_b is None: phases_b = phases_a return (phases_a[np.newaxis, :] - phases_b[:, np.newaxis]) % 1.0 def _get_square_data_frame(data, index) -> pd.DataFrame: return pd.DataFrame(data=data, index=index, columns=index)
[docs] def write_special(meas_input: DotDict, phase_advances: pd.DataFrame, plane_tune: float, plane: str): """ Writes out the special phase advances, if any given by the accelerator class. """ # TODO REFACTOR AND SIMPLIFY accel: Accelerator = meas_input.accelerator important_phase_advances: list = accel.important_phase_advances() if not important_phase_advances: return meas = phase_advances[MEASUREMENT] beam_direction = accel.beam_direction elements = accel.elements special_phase_columns = ['ELEMENT1', 'ELEMENT2', f'{PHASE}{plane}', f'{ERR}{PHASE}{plane}', f'{PHASE}{plane}_DEG', f'{ERR}{PHASE}{plane}_DEG', f'{PHASE}{plane}{MDL}', f'{PHASE}{plane}{MDL}_DEG', 'BPM1', 'BPM2', f'BPM_PHASE{plane}', f'BPM_{ERR}{PHASE}{plane}',] to_concat_rows = [] for elem1, elem2 in accel.important_phase_advances(): mus1 = elements.loc[elem1, f"{PHASE_ADV}{plane}"] - elements.loc[:, f"{PHASE_ADV}{plane}"] minmu1 = abs(mus1.loc[meas.index]).idxmin() mus2 = elements.loc[:, f"{PHASE_ADV}{plane}"] - elements.loc[elem2, f"{PHASE_ADV}{plane}"] minmu2 = abs(mus2.loc[meas.index]).idxmin() bpm_phase_advance = meas.loc[minmu1, minmu2] model_value = elements.loc[elem2, f"{PHASE_ADV}{plane}"] - elements.loc[elem1, f"{PHASE_ADV}{plane}"] if (elements.loc[elem1, S] - elements.loc[elem2, S]) * beam_direction > 0.0: bpm_phase_advance += plane_tune model_value += plane_tune bpm_err = phase_advances[f"{ERR}{MEASUREMENT}"].loc[minmu1, minmu2] elems_to_bpms = -mus1.loc[minmu1] - mus2.loc[minmu2] ph_result = ((bpm_phase_advance + elems_to_bpms) * beam_direction) model_value = (model_value * beam_direction) % 1 new_row = pd.DataFrame([[ elem1, elem2, ph_result % 1, bpm_err, _to_deg(ph_result), bpm_err * 360, model_value, _to_deg(model_value), minmu1, minmu2, bpm_phase_advance, elems_to_bpms, ]], columns=special_phase_columns, ) to_concat_rows.append(new_row) special_phase_df = pd.concat(to_concat_rows, axis="index", ignore_index=True) tfs.write(Path(meas_input.outputdir) / f"{SPECIAL_PHASE_NAME}{plane.lower()}{EXT}", special_phase_df)
def _to_deg(phase: ArrayLike | float) -> ArrayLike | float: """ Convert from tune units (radians [2pi]) to -90 to +90 degrees. """ phase = phase % 0.5 * 360 if phase < 90: return phase return phase - 180