Source code for pylhc.forced_da_analysis

"""
Forced DA Analysis
------------------

Top-level script to run the forced DA analysis, following the procedure described in
`CarlierForcedDA2019`_.

Arguments:

*--Required--*

- **beam** *(int)*: Beam to use.

  Flags: **['-b', '--beam']**
  Choices: ``[1, 2]``
- **energy** *(MultiClass)*: Beam energy in GeV.

  Flags: **['-e', '--energy']**
- **kick_directory** *(MultiClass)*: Analysis kick_directory containing kick files.

  Flags: **['-k', '--kickdir']**
- **plane** *(str)*: Plane of the kicks.

  Flags: **['-p', '--plane']**
  Choices: ``['X', 'Y']``

*--Optional--*

- **emittance_outlier_limit** *(float)*: Limit, i.e. cut from mean, on emittance outliers in meter.

  Default: ``5e-07``
- **emittance_tfs** *(MultiClass)*: Dataframe or Path of pre-saved emittance tfs.

- **emittance_type** *(str)*: Which BSRT data to use (from database).

  Choices: ``['fit_sigma', 'average']``
  Default: ``average``
- **emittance_window_length** *(int)*: Length of the moving average window. (# data points)

  Default: ``100``
- **fill** *(int)*: Fill that was used. If not given, check out time_around_kicks.

  Flags: **['-f', '--fill']**
- **fit** *(str)*: Fitting function to use (rearranges parameters to make sense).

  Choices: ``['exponential', 'linear']``
  Default: ``exponential``
- **intensity_tfs** *(MultiClass)*: Dataframe or Path of pre-saved intensity tfs.

- **intensity_time_after_kick** *(int)*: Defines the times after the kicks (in seconds) which is used for intensity averaging to calculate the losses.

  Default: ``[5, 30]``
- **intensity_time_before_kick** *(int)*: Defines the times before the kicks (in seconds) which is used for intensity averaging to calculate the losses.

  Default: ``[30, 5]``
- **normalized_emittance** *(float)*: Assumed NORMALIZED nominal emittance for the machine.

  Default: ``3.7499999999999997e-06``
- **output_directory** *(MultiClass)*: Output kick_directory, if not given subfolder in kick kick_directory

  Flags: **['-o', '--outdir']**
- **pagestore_db** *(MultiClass)*: (Path to-) presaved timber database

- **show**: Show plots.

  Action: ``store_true``
- **show_wirescan_emittance** *(BoolOrPathOrDataFrame)*: Flag if the emittance from wirescan should also be shown, can also be a Dataframe or Path of pre-saved emittance bws tfs.

  Default: ``False``
- **timber_db** *(str)*: Which timber database to use.

  Choices: ``['all', 'mdb', 'ldb', 'nxcals']``
  Default: ``all``
- **time_around_kicks** *(int)*: If no fill is given, this defines the time (in minutes) when data before the first and after the last kick is extracted.

  Default: ``10``
- **plot_styles** *(str)*: Which plotting styles to use,
  either from omc3 styles or default mpl.

  Default: ``['standard']``
- **manual_style** *(DictAsString)*: Additional style rcParameters which update the set of predefined ones.

  Default: ``{}``


:author: jdilly

.. _CarlierForcedDA2019: https://journals.aps.org/prab/pdf/10.1103/PhysRevAccelBeams.22.031002
"""
import os
from collections import defaultdict
from contextlib import suppress
from pathlib import Path
from typing import Tuple

import matplotlib as mpl
import matplotlib.colors as mcolors
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import matplotlib.transforms as mtrans
import numpy as np
import pandas as pd
import scipy.odr
import scipy.optimize
import tfs
from generic_parser import EntryPointParameters, entrypoint
from generic_parser.entry_datatypes import (
    DictAsString,
    FALSE_ITEMS,
    TRUE_ITEMS,
    get_instance_faker_meta,
    get_multi_class,
)
from generic_parser.tools import DotDict
from omc3.optics_measurements import toolbox
from omc3.plotting.utils import annotations, colors, lines, style
from omc3.tune_analysis.bbq_tools import clean_outliers_moving_average
from omc3.utils import logging_tools
from omc3.utils.iotools import save_config
from omc3.utils.mock import cern_network_import
from omc3.utils.time_tools import CERNDatetime
from pandas import DataFrame, Series
from pandas.plotting import register_matplotlib_converters
from tfs import TfsDataFrame
from tfs.tools import significant_digits

pytimber = cern_network_import('pytimber')
PageStore = cern_network_import('pytimber.pagestore.PageStore')


from pylhc.constants.forced_da_analysis import (
    BSRT_EMITTANCE_TO_METER,
    BWS_DIRECTIONS,
    BWS_EMITTANCE_TO_METER,
    HEADER_BSRT_OUTLIER_LIMIT,
    HEADER_BSRT_ROLLING_WINDOW,
    HEADER_ENERGY,
    HEADER_TIME_AFTER,
    HEADER_TIME_BEFORE,
    INITIAL_DA_FIT,
    INTENSITY,
    INTENSITY_AFTER,
    INTENSITY_BEFORE,
    INTENSITY_KEY,
    INTENSITY_LOSSES,
    KICKFILE,
    MAX_CURVEFIT_FEV,
    OUTFILE_INTENSITY,
    OUTLIER_LIMIT,
    PLOT_FILETYPES,
    RESULTS_DIR,
    ROLLING_AVERAGE_WINDOW,
    TIME_AFTER_KICK_S,
    TIME_AROUND_KICKS_MIN,
    TIME_BEFORE_KICK_S,
    YPAD,
    bsrt_emittance_key,
    bws_emittance_key,
    column_action,
    column_bws_norm_emittance,
    column_emittance,
    column_norm_emittance,
    err_col,
    header_da,
    header_da_error,
    header_nominal_emittance,
    header_norm_nominal_emittance,
    mean_col,
    outfile_emittance,
    outfile_emittance_bws,
    outfile_kick,
    outfile_plot,
    rel_col,
    sigma_col,
)
from pylhc.constants.general import (
    LHC_NOMINAL_EMITTANCE,
    TFS_SUFFIX,
    TIME_COLUMN,
    get_proton_beta,
    get_proton_gamma,
)

LOG = logging_tools.get_logger(__name__)


# Weird Datatypes
[docs] class BoolOrPathOrDataFrame( metaclass=get_instance_faker_meta(bool, Path, str, tfs.TfsDataFrame, pd.DataFrame, type(None)) ): """ A class that behaves like a `boolean` when possible, otherwise like a `Path`, `string` or `Dataframe`. """ def __new__(cls, value): if isinstance(value, str): value = value.strip("'\"") # behavior like dict-parser if value in TRUE_ITEMS: return True elif value in FALSE_ITEMS: return False else: try: return Path(value) except TypeError: return value
def _get_pathclass(*other_classes): class SomethingOrPath(metaclass=get_instance_faker_meta(Path, str, *other_classes, type(None))): """A class that behaves like a if possible `Path`, `string` or something else.""" def __new__(cls, value): if isinstance(value, str): value = value.strip("'\"") # Needs to be done for strings in config-files try: return Path(value) except TypeError: return value return SomethingOrPath PathOrDataframe = _get_pathclass(tfs.TfsDataFrame, pd.DataFrame) PathOrPagestore = _get_pathclass(PageStore) PathOrString = _get_pathclass() def get_params(): return EntryPointParameters( kick_directory=dict( flags=["-k", "--kickdir"], required=True, type=PathOrString, help="Analysis kick_directory containing kick files.", ), output_directory=dict( flags=["-o", "--outdir"], type=PathOrString, help="Output kick_directory, if not given subfolder in kick kick_directory", ), energy=dict( flags=["-e", "--energy"], required=True, type=get_multi_class(float, int), help="Beam energy in GeV.", ), fill=dict( flags=["-f", "--fill"], type=get_multi_class(int, type(None)), help="Fill that was used. If not given, check out time_around_kicks.", ), beam=dict( flags=["-b", "--beam"], required=True, choices=[1, 2], type=int, help="Beam to use." ), plane=dict( flags=["-p", "--plane"], choices=["X", "Y"], required=True, type=str, help=( "Plane of the kicks." # " Give 'XY' for using both planes (e.g. diagonal kicks)." # Future release ), ), time_around_kicks=dict( type=int, default=TIME_AROUND_KICKS_MIN, help=( "If no fill is given, this defines the time (in minutes) " "when data before the first and after the last kick is extracted." ), ), intensity_time_before_kick=dict( type=int, nargs=2, default=TIME_BEFORE_KICK_S, help=( "Defines the times before the kicks (in seconds) " "which is used for intensity averaging to calculate the losses." ), ), intensity_time_after_kick=dict( type=int, nargs=2, default=TIME_AFTER_KICK_S, help=( "Defines the times after the kicks (in seconds) " "which is used for intensity averaging to calculate the losses." ), ), normalized_emittance=dict( type=float, default=LHC_NOMINAL_EMITTANCE, help="Assumed NORMALIZED nominal emittance for the machine.", ), emittance_tfs=dict( type=PathOrDataframe, help="Dataframe or Path of pre-saved emittance tfs.", ), intensity_tfs=dict( type=PathOrDataframe, help="Dataframe or Path of pre-saved intensity tfs.", ), show_wirescan_emittance=dict( default=False, type=BoolOrPathOrDataFrame, help=( "Flag if the emittance from wirescan should also be shown, " "can also be a Dataframe or Path of pre-saved emittance bws tfs." ), ), timber_db=dict( type=str, default="all", choices=["all", "mdb", "ldb", "nxcals"], help="Which timber database to use.", ), pagestore_db=dict(type=PathOrPagestore, help="(Path to-) presaved timber database"), fit=dict( type=str, default="exponential", choices=["exponential", "linear"], help="Fitting function to use (rearranges parameters to make sense).", ), emittance_window_length=dict( help="Length of the moving average window. (# data points)", type=int, default=ROLLING_AVERAGE_WINDOW, ), emittance_outlier_limit=dict( help="Limit, i.e. cut from mean, on emittance outliers in meter.", type=float, default=OUTLIER_LIMIT, ), emittance_type=dict( type=str, default="average", choices=["fit_sigma", "average"], help="Which BSRT data to use (from database).", ), show=dict(action="store_true", help="Show plots.",), plot_styles=dict( type=str, nargs="+", default=["standard"], help="Which plotting styles to use, either from omc3 styles or default mpl.", ), manual_style=dict( type=DictAsString, default={}, help="Additional style rcParameters which update the set of predefined ones.", ), ) @entrypoint(get_params(), strict=True) def main(opt): LOG.debug("Starting Forced DA analysis.") _log_opt(opt) kick_dir, out_dir = _get_output_dir(opt.kick_directory, opt.output_directory) with suppress(PermissionError): save_config(out_dir, opt, __file__) # get data kick_df = _get_kick_df(kick_dir, opt.plane) intensity_df, emittance_df, emittance_bws_df = _get_dataframes( kick_df.index, opt.get_subdict( [ "fill", "beam", "plane", "time_around_kicks", "emittance_tfs", "intensity_tfs", "show_wirescan_emittance", "timber_db", "pagestore_db", "emittance_window_length", "emittance_outlier_limit", "emittance_type", "normalized_emittance", ] ), ) _check_all_times_in(kick_df.index, intensity_df.index[0], intensity_df.index[-1]) # add data to kicks kick_df = _add_intensity_and_losses_to_kicks( kick_df, intensity_df, opt.intensity_time_before_kick, opt.intensity_time_after_kick ) kick_df = _add_emittance_to_kicks( opt.plane, opt.energy, kick_df, emittance_df, opt.normalized_emittance ) kick_df = _do_fit(opt.plane, kick_df, opt.fit) kick_df = _convert_to_sigmas(opt.plane, kick_df) # output _write_tfs(out_dir, opt.plane, kick_df, intensity_df, emittance_df, emittance_bws_df) # plotting figs = dict() register_matplotlib_converters() # for datetime plotting style.set_style(opt.plot_styles, opt.manual_style) figs["emittance"] = _plot_emittances( out_dir, opt.beam, opt.plane, emittance_df, emittance_bws_df, kick_df.index ) figs["intensity"] = _plot_intensity(out_dir, opt.beam, opt.plane, kick_df, intensity_df) for fit_type in ("exponential", "linear", "norm"): figs[f"da_fit_{fit_type}"] = _plot_da_fit(out_dir, opt.beam, opt.plane, kick_df, fit_type) if opt.show: plt.show() LOG.debug("Forced DA analysis finished.") return figs # Helper --- def _log_opt(opt: DotDict): """Show options in log.""" LOG.info("Performing ForcedDA Analysis for:") if opt.fill is not None: LOG.info(f" Fill: {opt.fill}") LOG.info(f" Energy: {opt.energy} GeV") LOG.info(f" Beam: {opt.beam}") LOG.info(f" Plane: {opt.plane}") LOG.info(f" Analysis Directory: '{opt.kick_directory}'") def _write_tfs( out_dir: Path, plane: str, kick_df: DataFrame, intensity_df: DataFrame, emittance_df: DataFrame, emittance_bws_df: DataFrame, ): """Write out gathered data.""" LOG.debug("Writing tfs files.") for df in (kick_df, intensity_df, emittance_df, emittance_bws_df): if df is not None: df.insert(0, TIME_COLUMN, [CERNDatetime(dt).cern_utc_string() for dt in df.index]) try: tfs.write(out_dir / outfile_kick(plane), kick_df) tfs.write(out_dir / OUTFILE_INTENSITY, intensity_df) tfs.write(out_dir / outfile_emittance(plane), emittance_df) if emittance_bws_df is not None: tfs.write(out_dir / outfile_emittance_bws(plane), emittance_bws_df) except (FileNotFoundError, IOError): LOG.error(f"Cannot write into directory: {str(out_dir)} ") def _check_all_times_in(series: Series, start: CERNDatetime, end: CERNDatetime): """Check if all times in series are between start and end.""" if any(s for s in series if s < start or s > end): raise ValueError( "Some of the kick-times are outside of the fill times! " "Check if correct kick-file or fill number are used." ) def _convert_time_index(list_: list, path: Path = None) -> pd.Index: """Tries to convert time index to cerntime, first from datetime, then string, then timestamp.""" for index_convert in ( _datetime_to_cerntime_index, _string_to_cerntime_index, _timestamp_to_cerntime_index, ): with suppress(TypeError): return index_convert(list_) msg = f"Unrecognized format in column '{TIME_COLUMN}'" if path: msg += f" in '{str(path)}'" raise TypeError(msg) def _string_to_cerntime_index(list_): return pd.Index((CERNDatetime.from_cern_utc_string(t) for t in list_), dtype=object) def _timestamp_to_cerntime_index(list_): return pd.Index((CERNDatetime.from_timestamp(t) for t in list_), dtype=object) def _datetime_to_cerntime_index(list_): return pd.Index((CERNDatetime(t) for t in list_), dtype=object) def _drop_duplicate_indices(df): duplicate_mask = [True] + [ df.index[idx] != df.index[idx - 1] for idx in range(1, len(df.index)) ] return df.loc[duplicate_mask, :] # TFS Data Loading ------------------------------------------------------------- def _get_dataframes( kick_times: pd.Index, opt: DotDict ) -> Tuple[TfsDataFrame, TfsDataFrame, TfsDataFrame]: """Gets the intensity and emittance dataframes from either input, files or (timber) database.""" db = _get_db(opt) if opt.fill is not None: timespan_ts = _get_fill_times(db, opt.fill) timespan_dt = _convert_time_index(timespan_ts) else: td = pd.Timedelta(minutes=opt.time_around_kicks) timespan_dt = (kick_times.min() - td, kick_times.max() + td) timespan_ts = tuple(t.timestamp() for t in timespan_dt) if opt.intensity_tfs: intensity_df = _read_tfs(opt.intensity_tfs, timespan_dt) else: intensity_df = _get_bctrf_beam_intensity_from_timber(opt.beam, db, timespan_ts) if opt.emittance_tfs: emittance_df = _read_tfs(opt.emittance_tfs, timespan_dt) else: emittance_df = _get_bsrt_bunch_emittances_from_timber( opt.beam, opt.plane, db, timespan_ts, opt.emittance_type, opt.normalized_emittance ) emittance_df = _filter_emittance_data( emittance_df, opt.plane, opt.emittance_window_length, opt.emittance_outlier_limit ) if opt.show_wirescan_emittance is True: emittance_bws_df = _get_bws_emittances_from_timber(opt.beam, opt.plane, db, timespan_ts) elif opt.show_wirescan_emittance: emittance_bws_df = _read_tfs(opt.show_wirescan_emittance, timespan_dt) else: emittance_bws_df = None return intensity_df, emittance_df, emittance_bws_df def _read_tfs(tfs_file_or_path, timespan): """Read previously gathered data (see :meth:`pylhc.forced_da_analysis._write_tfs`).""" try: tfs_df = tfs.read_tfs(tfs_file_or_path, index=TIME_COLUMN) except IOError: tfs_df = tfs_file_or_path # hopefully tfs_df.index = _convert_time_index(tfs_df.index) return tfs_df.loc[slice(*timespan), :] def _filter_emittance_data(df, planes, window_length, limit): """Cleans emittance data via outlier filter and moving average.""" for plane in planes: LOG.debug(f"Filtering emittance data in plane {plane}.") col_nemittance = column_norm_emittance(plane) # col_err_nemittance = err_col(col_nemittance) col_mean = mean_col(col_nemittance) col_err_mean = err_col(col_mean) mav, std, mask = clean_outliers_moving_average( df[col_nemittance], length=window_length, limit=limit ) df[col_mean] = mav df[col_err_mean] = std # if any(df[col_err_nemittance]): # df[col_err_mean] = _rolling_errors(df[col_err_nemittance], ~mask, window_length) df = df.dropna(axis="index") if len(df.index) == 0: raise IndexError("Not enough emittance data extracted. Try to give a fill number.") df.headers[HEADER_BSRT_ROLLING_WINDOW] = window_length df.headers[HEADER_BSRT_OUTLIER_LIMIT] = limit df = _maybe_add_sum_for_planes(df, planes, column_norm_emittance) df = _maybe_add_sum_for_planes( df, planes, lambda p: mean_col(column_norm_emittance(p)), lambda p: err_col(mean_col(column_norm_emittance(p))), ) return df # Timber Data ------------------------------------------------------------------ def _get_db(opt): """Get the database either presaved or from timber.""" db = None if opt.pagestore_db: db = opt.pagestore_db try: db_path = Path(db) except TypeError: pass else: LOG.debug(f"Loading database from file {str(db_path)}") db = PageStore(f"file:{str(db_path)}", str(db_path.with_suffix(""))) if opt.fill is not None: raise EnvironmentError("'fill' can't be used with pagestore database.") else: LOG.debug(" Trying to load database from timber.") try: db = pytimber.LoggingDB(source=opt["timber_db"]) except AttributeError: LOG.debug(" Loading from timber failed.") if not db: error_msg = "" if opt.fill is not None: error_msg += "'fill' is given, " if opt.emittance_tfs is None: error_msg += "'emittance_tfs' is not given, " if opt.intensity_tfs is None: error_msg += "'intensity_tfs' is not given, " if opt.show_wirescan_emittance is True: error_msg += "wirescan emittance is requested, " if len(error_msg): error_msg += ( "but there is no database given and no access to timber databases. Aborting." ) raise EnvironmentError(error_msg) return db def _get_fill_times(db, fill): """Extract Fill times from database.""" LOG.debug(f"Getting Timespan from fill {fill}") filldata = db.getLHCFillData(fill) return filldata["startTime"], filldata["endTime"] def _get_bctrf_beam_intensity_from_timber(beam, db, timespan): LOG.debug(f"Getting beam intensity from bctfr for beam {beam}.") intensity_key = INTENSITY_KEY.format(beam=beam) LOG.debug(f" Key: {intensity_key}") x, y = db.get(intensity_key, *timespan)[intensity_key] df = tfs.TfsDataFrame( data=y, index=_timestamp_to_cerntime_index(x), columns=[INTENSITY], dtype=float ) df = _drop_duplicate_indices(df) LOG.debug(f" Returning dataframe of shape {df.shape}") return df def _get_bsrt_bunch_emittances_from_timber(beam, planes, db, timespan, key_type, nominal_emittance): dfs = {p: None for p in planes} for plane in planes: LOG.debug(f"Getting emittance from BSRT for beam {beam} and plane {plane}.") bunch_emittance_key = bsrt_emittance_key(beam, plane, key_type) LOG.debug(f" Key: {bunch_emittance_key}") col_nemittance = column_norm_emittance(plane) all_columns = [f(col_nemittance) for f in (lambda s: s, mean_col, err_col)] + [ err_col(mean_col(col_nemittance)) ] x, y = db.get(bunch_emittance_key, *timespan)[bunch_emittance_key] y_std = np.zeros_like(x) if key_type == "fit_sigma": # add all data with the same timestamp y_new = defaultdict(list) for x_elem, y_elem in zip(x, y): y_new[f"{x_elem:.3f}"] += y_elem.tolist() # get average and std per timestamp x = np.array([float(elem) for elem in y_new.keys()]) y = np.array([np.average(elem) for elem in y_new.values()]) * nominal_emittance y_std = np.array([np.std(elem) for elem in y_new.values()]) * nominal_emittance elif key_type == "average": y *= BSRT_EMITTANCE_TO_METER y_std *= BSRT_EMITTANCE_TO_METER # remove entries with zero emittance as unphysical x, y, y_std = x[y != 0], y[y != 0], y_std[y != 0] df = tfs.TfsDataFrame( index=_timestamp_to_cerntime_index(x), columns=all_columns, dtype=float, ) df[col_nemittance] = y df[err_col(col_nemittance)] = y_std dfs[plane] = df df = _merge_df_planes(dfs, planes) LOG.debug(f" Returning dataframe of shape {df.shape}") return df def _get_bws_emittances_from_timber(beam, planes, db, timespan): dfs = {p: None for p in planes} for plane in planes: LOG.debug(f"Getting emittance from BWS for beam {beam} and plane {plane}.") all_columns = [column_bws_norm_emittance(plane, d) for d in BWS_DIRECTIONS] df = None for direction in BWS_DIRECTIONS: emittance_key = bws_emittance_key(beam, plane, direction) LOG.debug(f" Key: {emittance_key}") column_nemittance = column_bws_norm_emittance(plane, direction) x, y = db.get(emittance_key, *timespan)[emittance_key] if df is None: df = tfs.TfsDataFrame( index=_timestamp_to_cerntime_index(x), columns=all_columns, dtype=float ) df[column_nemittance] = y * BWS_EMITTANCE_TO_METER df[column_nemittance] = df[column_nemittance].apply( np.mean ) # BWS can give multiple values df[err_col(column_nemittance)] = df[column_nemittance].apply( np.std ) # BWS can give multiple values dfs[plane] = df df = _merge_df_planes(dfs, planes) for direction in BWS_DIRECTIONS: df = _maybe_add_sum_for_planes( df, planes, lambda p: column_bws_norm_emittance(p, direction), lambda p: err_col(column_bws_norm_emittance(p, direction)), ) LOG.debug(f" Returning dataframe of shape {df.shape}") return df # Kick Data -------------------------------------------------------------------- def _get_kick_df(kick_dir, plane): def column_action_error(x): return err_col(column_action(x)) try: df = _get_new_kick_file(kick_dir, plane) except FileNotFoundError: LOG.debug("Reading of kickfile failed. Looking for old kickfile.") df = _get_old_kick_file(kick_dir, plane) df = _maybe_add_sum_for_planes(df, plane, column_action, column_action_error) return df[[column_action(plane), column_action_error(plane)]] def _get_old_kick_file(kick_dir, plane): """Kick files from ``Beta-Beat.src``.""" path = kick_dir / "getkickac.out" LOG.debug(f"Reading kickfile '{str(path)}'.'") df = tfs.read(path) df = df.set_index(TIME_COLUMN) df.index = _convert_time_index(df.index, path) rename_dict = {} for p in plane: # can be XY rename_dict.update( { f"2J{p}RES": column_action(p), f"2J{p}STDRES": err_col(column_action(p)), f"J{p}2": column_action(p), # pre 2017 f"J{p}STD": err_col(column_action(p)), # pre 2017 } ) df = df.rename(rename_dict, axis="columns") renamed_cols = list(set(rename_dict.values())) df.loc[:, renamed_cols] = df.loc[:, renamed_cols] * 1e-6 return df def _get_new_kick_file(kick_dir, planes): """Kick files from ``omc3``.""" dfs = {p: None for p in planes} for plane in planes: path = kick_dir / f"{KICKFILE}_{plane.lower()}{TFS_SUFFIX}" LOG.debug(f"Reading kickfile '{str(path)}'.'") df = tfs.read(path, index=TIME_COLUMN) df.index = pd.Index([CERNDatetime.from_cern_utc_string(t) for t in df.index], dtype=object) dfs[plane] = df return _merge_df_planes(dfs, planes) def _get_output_dir(kick_directory, output_directory): kick_path = Path(kick_directory) if output_directory: output_path = Path(output_directory) else: output_path = kick_path / RESULTS_DIR try: output_path.mkdir(exist_ok=True) except PermissionError: LOG.warn( f"You have no writing permission in '{str(output_path)}', " f"output data might not be created." ) LOG.info(f"All output will be written to {str(output_path)}") return kick_path, output_path # Intensity at Kicks ----------------------------------------------------------- def _add_intensity_and_losses_to_kicks(kick_df, intensity_df, time_before, time_after): LOG.debug("Calculating intensity and losses for the kicks.") col_list = [INTENSITY_BEFORE, INTENSITY_AFTER, INTENSITY_LOSSES] new_columns = [col for col in col_list + [err_col(c) for c in col_list]] kick_df = kick_df.reindex(columns=kick_df.columns.tolist() + new_columns) kick_df = _get_intensities_around_kicks(kick_df, intensity_df, time_before, time_after) kick_df = _calculate_intensity_losses_at_kicks(kick_df) return kick_df def _get_intensities_around_kicks(kick_df, intensity_df, time_before, time_after): LOG.debug("Calculating beam intensity before and after kicks.") # input signs and order does not matter time_before = sorted(-np.abs(t) for t in time_before) time_after = sorted(np.abs(t) for t in time_after) kick_df.headers[HEADER_TIME_BEFORE] = str(time_before) kick_df.headers[HEADER_TIME_AFTER] = str(time_after) for i, time in enumerate(kick_df.index): # calculate intensity before and after kicks (with error) for column, time_delta in ((INTENSITY_BEFORE, time_before), (INTENSITY_AFTER, time_after)): t_from, t_to = ( time + pd.Timedelta(seconds=time_delta[0]), time + pd.Timedelta(seconds=time_delta[1]), ) data = intensity_df.loc[ t_from:t_to, INTENSITY ] # awesome pandas can handle time intervals! kick_df.loc[time, [column, err_col(column)]] = data.mean(), data.std() return kick_df def _calculate_intensity_losses_at_kicks(kick_df): LOG.debug("Calculating intensity losses.") # absolute losses kick_df[INTENSITY_LOSSES] = kick_df[INTENSITY_BEFORE] - kick_df[INTENSITY_AFTER] kick_df[err_col(INTENSITY_LOSSES)] = np.sqrt( np.square(kick_df[err_col(INTENSITY_BEFORE)]) + np.square(kick_df[err_col(INTENSITY_AFTER)]) ) # relative losses, error from error-propagation formular for losses / I_before = 1 - I_after / I_before kick_df[rel_col(INTENSITY_LOSSES)] = kick_df[INTENSITY_LOSSES] / kick_df[INTENSITY_BEFORE] kick_df[rel_col(err_col(INTENSITY_LOSSES))] = np.sqrt( np.square(kick_df[INTENSITY_AFTER] / kick_df[INTENSITY_BEFORE]) * ( np.square(kick_df[err_col(INTENSITY_AFTER)] / kick_df[INTENSITY_AFTER]) + np.square(kick_df[err_col(INTENSITY_BEFORE)] / kick_df[INTENSITY_BEFORE]) ) ) return kick_df # Emittance at Kicks ----------------------------------------------------------- def _add_emittance_to_kicks(plane, energy, kick_df, emittance_df, nominal): LOG.debug("Retrieving normalized emittance at the kicks.") kick_df.headers[HEADER_ENERGY] = energy kick_df.headers[HEADER_BSRT_ROLLING_WINDOW] = ROLLING_AVERAGE_WINDOW col_nemittance = column_norm_emittance(plane) cols_emitt = [mean_col(col_nemittance), err_col(mean_col(col_nemittance))] cols_kick = [col_nemittance, err_col(col_nemittance)] kick_df = kick_df.reindex(columns=kick_df.columns.tolist() + cols_kick) idx_emitt = [emittance_df.columns.get_loc(c) for c in cols_emitt] for time in kick_df.index: idx_kick = get_approximate_index(emittance_df, time) kick_df.loc[time, cols_kick] = emittance_df.iloc[idx_kick, idx_emitt].values # add de-normalized emittance normalization = get_proton_gamma(energy) * get_proton_beta( energy ) # norm emittance to emittance col_emittance = column_emittance(plane) kick_df.headers[header_norm_nominal_emittance(plane)] = nominal kick_df.headers[header_nominal_emittance(plane)] = nominal / normalization kick_df[col_emittance] = kick_df[col_nemittance] / normalization kick_df[err_col(col_emittance)] = kick_df[err_col(col_nemittance)] / normalization return kick_df # Forced DA Fitting ------------------------------------------------------------
[docs] def fun_exp_decay(p, x): # fit and plot """sp = DA_J, x[0] = action (2J res), x[1] = emittance""" return np.exp(-(p - (0.5 * x[0])) / x[1])
[docs] def fun_exp_sigma(p, x): # only used for plotting """p = DA_sigma, x = action (J_sigma)""" return np.exp(-0.5 * (p ** 2 - x ** 2))
[docs] def fun_linear(p, x): # fit and plot """p = DA_J, x = action (2J res)""" return x * 0.5 - p
[docs] def swap_fun_parameters(fun): """Parameter swapped for Curvefit.""" return lambda x, p: fun(p, x)
def _do_fit(plane, kick_df, fit_type): LOG.debug("Fitting forced da to exponential. ") action, emittance, rel_losses = _get_fit_data(kick_df, plane) init_guess = [INITIAL_DA_FIT * kick_df.headers[header_nominal_emittance(plane)]] get_fit_param = {"linear": _linear_fit_parameters, "exponential": _exponential_fit_parameters}[ fit_type ] fit_fun, x, y, sx, sy = get_fit_param(action, emittance, rel_losses) # do prelim fit init_fit, _ = _fit_curve(swap_fun_parameters(fit_fun), x, y, init_guess) # do odr odr = _fit_odr(fit_fun, x, y, sx, sy, init_fit) # add DA to kick da = odr.beta[0], odr.sd_beta[0] kick_df.headers[header_da(plane)], kick_df.headers[header_da_error(plane)] = da LOG.info(f"Forced DA (wrt. J) in {plane} [m]: {da[0]} ± {da[1]}") return kick_df def _get_fit_data(kick_df, plane): """Extracts necessary data from ``kick-df``. Returns tri-tuple of tuples (data, std).""" col_action = column_action(plane) col_emittance = column_emittance(plane) col_losses = rel_col(INTENSITY_LOSSES) # get data action = kick_df[col_action], _no_nonzero_errors(kick_df[err_col(col_action)]) emittance = kick_df[col_emittance], _no_nonzero_errors(kick_df[err_col(col_emittance)]) rel_losses = kick_df[col_losses], _no_nonzero_errors(kick_df[err_col(col_losses)]) return action, emittance, rel_losses def _exponential_fit_parameters(action, emittance, rel_losses): """Returns exponential fit function and parameters. All inputs are tuples of (data, std).""" x = action[0], emittance[0] y = rel_losses[0] sx = [action[1], emittance[1]] sy = rel_losses[1] return fun_exp_decay, x, y, sx, sy def _linear_fit_parameters(action, emittance, rel_losses): """ Returns linear fit function and parameters. All inputs are tuples of (data, std).""" log_losses = np.log(rel_losses[0]) x = action[0] y = emittance[0] * log_losses sx = action[1] sy = np.sqrt( (log_losses * emittance[1]) ** 2 + ((emittance[0] * rel_losses[1]) / rel_losses[0]) ** 2 ) return fun_linear, x, y, sx, sy def _fit_curve(fun, x, y, init): """Initial curve fit, without errors.""" fit, cov = scipy.optimize.curve_fit(fun, x, y, p0=init, maxfev=MAX_CURVEFIT_FEV) LOG.info(f"Initial DA fit: {fit} with cov {cov}") return fit, np.sqrt(np.diag(cov)) def _fit_odr(fun, x, y, sx, sy, init): """ODR Fit (includes errors).""" # fill zero errors with the minimum error - otherwise fit will not work fit_model_sigma = scipy.odr.Model(fun) data_model_sigma = scipy.odr.RealData(x=x, y=y, sx=sx, sy=sy,) da_odr = scipy.odr.ODR(data_model_sigma, fit_model_sigma, beta0=init) # da_odr.set_job(fit_type=2) odr_output = da_odr.run() logging_tools.odr_pprint(LOG.info, odr_output) return odr_output def _no_nonzero_errors(series): """Removes all zero-erros and replaces them with minimum errors in set.""" series = series.copy() nonzero = series[series != 0] if len(nonzero) == 0: raise ValueError("All errors are exact zero. Can't do ODR fit.") series[series == 0] = np.abs(series[series != 0]).min() return series def _convert_to_sigmas(plane, kick_df): """Converts the DA and the Action into Sigma-Units.""" LOG.debug("Calculating action and da in sigmas.") nominal_emittance = kick_df.headers[header_nominal_emittance(plane)] emittance = kick_df[column_emittance(plane)] emittance_mean, emittance_std = emittance.mean(), emittance.std() emittance_sign, emittance_sign_std = significant_digits( emittance_mean * 1e12, emittance_std * 1e12 ) LOG.info( f"Measured Emittance {emittance_sign} ± {emittance_sign_std} pm" f" (Nominal {nominal_emittance*1e12: .2f} pm)" ) # DA (in units of J) to DA_sigma da, da_err = kick_df.headers[header_da(plane)], kick_df.headers[header_da_error(plane)] da_sigma, da_sigma_err = ( np.sqrt(2 * da / emittance_mean), da_err / np.sqrt(2 * da * emittance_mean), ) kick_df.headers[header_da(plane, unit="sigma")] = da_sigma kick_df.headers[header_da_error(plane, unit="sigma")] = da_sigma_err LOG.info(f"Forced DA {plane} in N-sigma: {da_sigma} ± {da_sigma_err}") # Action (in units of 2J) to J_sigma col_action = column_action(plane) # kick_df[sigma_col(col_action)] = np.sqrt(kick_df[col_action] / nominal_emittance) # kick_df[err_col(sigma_col(col_action))] = ( # 0.5 * kick_df[err_col(col_action)] / np.sqrt(kick_df[col_action] * nominal_emittance) # ) kick_df[sigma_col(col_action)] = np.sqrt(kick_df[col_action] / emittance) kick_df[err_col(sigma_col(col_action))] = ( 0.5 * kick_df[err_col(col_action)] / np.sqrt(kick_df[col_action] * emittance) ) return kick_df # Plotting --------------------------------------------------------------------- def _plot_intensity(directory, beam, plane, kick_df, intensity_df): """ Plots beam intensity. For losses, the absolute values are used and then normalized to the Intensity before the kicks, to get the percentage relative to that (global) value. """ LOG.debug("Plotting beam intensity") fig, ax = plt.subplots(ncols=1, nrows=1, figsize=(16.80, 7.68)) x_span = (kick_df.index.max() - kick_df.index.min()).seconds * np.array( [0.03, 0.09] ) # defines x-limits # convert to % relative to before first kick idx_before = get_approximate_index( intensity_df, kick_df.index.min() - pd.Timedelta(seconds=x_span[0]), method="ffill" ) idx_intensity = intensity_df.columns.get_loc(INTENSITY) # for iloc intensity_start = intensity_df.iloc[idx_before, idx_intensity] norm = intensity_start / 100.0 # plot intensity ax.plot( _date2num(intensity_df.index), intensity_df[INTENSITY] / norm, marker=".", markersize=mpl.rcParams["lines.markersize"] * 0.5, fillstyle="full", color=colors.get_mpl_color(0), label="Intensity", ) # plot losses per kick normalized_intensity = kick_df.loc[:, [INTENSITY_BEFORE, INTENSITY_AFTER]] / norm normalized_intensity_error = ( kick_df.loc[:, [err_col(INTENSITY_BEFORE), err_col(INTENSITY_AFTER)]] / norm ) normalized_losses = kick_df.loc[:, [INTENSITY_LOSSES, err_col(INTENSITY_LOSSES)]] / norm normalized_losses_kick = ( kick_df.loc[:, [rel_col(INTENSITY_LOSSES), err_col(rel_col(INTENSITY_LOSSES))]] * 100 ) for idx, kick in enumerate(kick_df.index): ax.errorbar( [_date2num(kick)] * 2, normalized_intensity.loc[kick, :], yerr=normalized_intensity_error.loc[kick, :], color=colors.get_mpl_color(1), marker=".", linestyle="-", label="__nolegend__" if idx > 0 else "Losses", ) ax.text( _date2num(kick), 0.5 * sum(normalized_intensity.loc[kick, :]), " -{:.1f}$\pm${:.1f} %\n".format(*normalized_losses.loc[kick, :]) + " (-{:.1f}$\pm${:.1f} %)".format(*normalized_losses_kick.loc[kick, :]), va="bottom", color=colors.get_mpl_color(1), fontdict=dict(fontsize=mpl.rcParams["font.size"] * 0.8), ) _plot_kicks_and_scale_x(ax, kick_df.index, pad=x_span) ylim = [normalized_intensity.min().min(), normalized_intensity.max().max()] ypad = 0.1 * (ylim[1] - ylim[0]) ax.set_ylim([ylim[0] - ypad, ylim[1] + ypad]) ax.set_ylabel(r"Beam Intensity [%]") annotations.make_top_legend(ax, ncol=3) plt.tight_layout() annotations.set_name(f"Intensity Beam {beam}, Plane {plane}", fig) annotations.set_annotation( f"Intensity at 100%: {intensity_start*1e-10:.3f}" "$\;\cdot\;10^{{10}}$ charges", ax=ax, position="left", ) _save_fig(directory, plane, fig, "intensity") return fig def _plot_emittances(directory, beam, plane, emittance_df, emittance_bws_df, kick_times): LOG.debug("Plotting normalized emittances") fig, ax = plt.subplots(ncols=1, nrows=1, figsize=(10.24, 7.68)) col_norm_emittance = column_norm_emittance(plane) bsrt_color = colors.get_mpl_color(0) bws_color = colors.get_mpl_color(1) ax.errorbar( _date2num(emittance_df.index), emittance_df[col_norm_emittance] * 1e6, # Actual BSRT measurement yerr=emittance_df[err_col(col_norm_emittance)] * 1e6, color=bsrt_color, marker="o", markeredgewidth=2, linestyle="None", label="From BSRT", ) ax.errorbar( _date2num(emittance_df.index), emittance_df[mean_col(col_norm_emittance)] * 1e6, yerr=emittance_df[err_col(mean_col(col_norm_emittance))] * 1e6, color=colors.change_color_brightness(bsrt_color, 0.7), marker="", label=f"Moving Average (window = {ROLLING_AVERAGE_WINDOW})", ) if emittance_bws_df is not None and len(emittance_bws_df.index): for d in BWS_DIRECTIONS: label = "__nolegend__" if d == BWS_DIRECTIONS[1] else "From BWS" color = ( bws_color if d == BWS_DIRECTIONS[1] else colors.change_color_brightness(bws_color, 0.5) ) col_bws_nemittance = column_bws_norm_emittance(plane, d) ax.errorbar( _date2num(emittance_bws_df.index), emittance_bws_df[col_bws_nemittance] * 1e6, yerr=emittance_bws_df[err_col(col_bws_nemittance)] * 1e6, linestyle="None", marker="o", color=color, label=label, markersize=mpl.rcParams["lines.markersize"] * 1.5, ) _plot_kicks_and_scale_x(ax, kick_times) ax.set_ylabel(r"$\epsilon_{n}$ $[\mu m]$") annotations.make_top_legend(ax, ncol=2) plt.tight_layout() annotations.set_name(f"Emittance Beam {beam}, Plane {plane}", fig) _save_fig(directory, plane, fig, "emittance") return fig def _plot_da_fit(directory, beam, plane, k_df, fit_type): """ Plot the Forced Dynamic Aperture fit. (I do not like the complexity of this function. jdilly). """ LOG.debug(f"Plotting Dynamic Aperture Fit for {fit_type}") col_action = column_action(plane) col_action_sigma = sigma_col(col_action) col_emittance = column_emittance(plane) col_intensity = rel_col(INTENSITY_LOSSES) kick_df = k_df.copy() kick_df = kick_df.sort_values(by=col_action) fig, ax = plt.subplots(ncols=1, nrows=1, figsize=(10.24, 7.68)) # Plot Measurement Data intensity = kick_df[col_intensity] intensity_err = kick_df[err_col(col_intensity)] if fit_type == "linear": intensity_err = np.abs(1 / intensity) * intensity_err intensity = np.log(intensity) else: intensity *= 100 intensity_err *= 100 if fit_type == "norm": action = kick_df[col_action_sigma] action_err = kick_df[err_col(col_action_sigma)] action_x = action action_xerr = action_err else: action = kick_df[col_action] action_err = kick_df[err_col(col_action)] action_x = action * 1e6 action_xerr = action_err * 1e6 ax.errorbar( action_x, intensity, xerr=action_xerr, yerr=intensity_err, marker=".", color=colors.get_mpl_color(0), label="Kicks", ) # Plot Fit emittance = kick_df[col_emittance] da, da_err = kick_df.headers[header_da(plane)], kick_df.headers[header_da_error(plane)] da_mu, da_err_mu = significant_digits(da * 1e6, da_err * 1e6) da_label = f"Fit: DA$_J$= ${da_mu} \pm {da_err_mu} \mu m$" if fit_type == "linear": fit_fun = fun_linear fit_data = action multiplier = 1 / emittance # DA-J/emittance = -ln(I/Io) elif fit_type == "exponential": fit_fun = fun_exp_decay fit_data = (action, emittance) multiplier = 100 # for percentages elif fit_type == "norm": da, da_err = ( kick_df.headers[header_da(plane, unit="sigma")], kick_df.headers[header_da_error(plane, unit="sigma")], ) da_round, da_err_round = significant_digits(da, da_err) da_label = f"Fit: DA= ${da_round} \pm {da_err_round} N_{{\sigma}}$" fit_fun = fun_exp_sigma fit_data = action multiplier = 100 # for percentages fit_mean = fit_fun(da, fit_data) * multiplier fit_min = fit_fun(da - da_err, fit_data) * multiplier fit_max = fit_fun(da + da_err, fit_data) * multiplier color = colors.get_mpl_color(1) ax.fill_between(action_x, fit_min, fit_max, facecolor=mcolors.to_rgba(color, 0.3)) ax.plot(action_x, fit_mean, ls="--", c=color, label=da_label) # extend fit to 100% losses color_ext = "#7f7f7f" action_max = action.max() emittance_at_max = emittance[action == action_max][0] if fit_type in ["linear", "exponential"]: da_x = da * 2 * 1e6 da_string = "2DA$_J$" elif fit_type == "norm": da_x = da da_string = "DA$_\sigma$" if action_max < da: if fit_type in ["linear", "exponential"]: action_ext = np.linspace(action_max, 2 * da, 10) action_x_ext = action_ext * 1e6 if fit_type == "exponential": fit_data_ext = (action_ext, emittance_at_max) elif fit_type == "linear": fit_data_ext = action_ext multiplier = 1 / emittance_at_max else: action_ext = np.linspace(action_max, da, 10) action_x_ext = action_ext fit_data_ext = action_ext fit_ext = fit_fun(da, fit_data_ext) * multiplier ax.plot( action_x_ext, fit_ext, ls="--", color=mcolors.to_rgba(color_ext, 0.3), label="__nolegend__", ) # DA Marker ax.axvline(da_x, ls="--", color=color_ext, marker="", label="__nolegend__") trans = mtrans.blended_transform_factory(ax.transData, ax.transAxes) # x is data, y is axes ax.text( x=da_x, y=1.0, s=da_string, va="bottom", ha="center", zorder=-1, color=color_ext, transform=trans, ) # Format figure if fit_type == "norm": nominal_emittance = kick_df.headers[header_nominal_emittance(plane)] emittance_mean, emittance_std = emittance.mean(), emittance.std() emittance_sign, emittance_sign_std = significant_digits( emittance_mean * 1e12, emittance_std * 1e12 ) ax.text( x=0, y=1.00, s=( f"$\epsilon_{{mean}}$ = {emittance_sign} $\pm$ {emittance_sign_std} pm " f"($\epsilon_{{nominal}}$ = {nominal_emittance*1e12: .2f} pm)" ), transform=ax.transAxes, va="bottom", ha="left", ) ax.set_xlabel( f"$N_{{\sigma}} = \sqrt{{2J_{{{plane if len(plane) == 1 else ''}}}/\epsilon}}$" ) else: ax.set_xlabel(f"$2J_{{{plane if len(plane) == 1 else ''}}} \; [\mu m]$") if fit_type == "linear": ax.set_ylabel(r"ln($I/I_0$)") else: ax.set_ylabel(r"Beam Losses [%]") ax.set_ylim([0, intensity.max() * (1 + YPAD)]) ax.set_xlim([0, None]) annotations.make_top_legend(ax, ncol=3) plt.tight_layout() annotations.set_name( f"DA {'' if fit_type == 'norm' else 'J'} {fit_type} Fit {beam}, Plane {plane}", fig ) _save_fig(directory, plane, fig, f"dafit_{fit_type}") return fig def _get_fit_plot_data(da, da_err, data, fit_type): fit_fun = {"exponential": fun_exp_decay, "linear": fun_linear}[fit_type] multiplier = 100 # for percentages if fit_type == "linear": multiplier = 1 / data[1] # DA-J/emittance = -ln(I/Io) data = data[0] fit_mean = fit_fun(da, data) * multiplier fit_min = fit_fun(da - da_err, data) * multiplier fit_max = fit_fun(da + da_err, data) * multiplier return fit_mean, fit_min, fit_max # Helper ---
[docs] def get_approximate_index(df, item, method="nearest"): """ Emulates the `get_loc` from pandas<2.0, i.e. single index input and output. """ return df.index.get_indexer([item], method=method)[0]
def _plot_kicks_and_scale_x(ax, kick_times, pad=20): lines.plot_vertical_lines_fast( ax, kick_times, color="grey", linestyle="--", alpha=0.8, marker="", label="Kicks" ) first_kick, last_kick = kick_times.min(), kick_times.max() try: time_delta = [pd.Timedelta(seconds=pad[i]) for i in range(2)] except TypeError: time_delta = [pd.Timedelta(seconds=pad) for _ in range(2)] # ax.set_xlim([(first_kick - time_delta[0]).timestamp, last_kick + time_delta[1]]) # worked in the past ax.set_xlim([_date2num(first_kick - time_delta[0]), _date2num(last_kick + time_delta[1])]) ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M:%S")) ax.set_xlabel("Time") annotations.set_annotation(f"Date: {first_kick.strftime('%Y-%m-%d')}", ax, "left") def _merge_df_planes(df_dict, planes): """In case planes == 'XY' merge the ``df_dict`` into one dataframe..""" if len(planes) == 1: return df_dict[planes] return pd.merge(*df_dict.values(), how="inner", left_index=True, right_index=True) def _maybe_add_sum_for_planes(df, planes, col_fun, col_err_fun=None): """In case planes == 'XY' add the two plane columns and their errors.""" if len(planes) > 1: if col_err_fun is not None: cols = lambda p: [col_fun(p), col_err_fun(p)] x_cols, y_cols = [cols(p) for p in planes] df = df.reindex(columns=df.columns.to_list() + cols(planes)) df[cols(planes)] = np.array( toolbox.df_sum_with_err( df, a_col=x_cols[0], b_col=y_cols[0], a_err_col=x_cols[1], b_err_col=y_cols[1] ) ).T else: x_col, y_col = [col_fun(p) for p in planes] df[col_fun(planes)] = toolbox.df_sum(df, a_col=x_col, b_col=y_col) return df def _date2num(times): """ Convert CERNDatetime to mpl-number (days). Converts input times to plain-datetime first as date2num causes infinite loop with CernDateTimes in **Python 3.8**. """ try: times = [cdt.datetime for cdt in times] except AttributeError: pass # probably datetime already except TypeError: try: # not iterable times = times.datetime except AttributeError: pass # probably datetime already return mdates.date2num(times) def _save_fig(directory, plane, fig, ptype): try: for ftype in PLOT_FILETYPES: path = os.path.join(directory, outfile_plot(ptype, plane, ftype)) LOG.debug(f"Saving Figure to {path}") fig.savefig(path) except IOError: LOG.error(f"Couldn't create output files for {ptype} plots.") # Script Mode ------------------------------------------------------------------ if __name__ == "__main__": main()