Source code for omc3.tune_analysis.timber_extract

"""
Timber Extraction
-----------------

Tools to extract data from ``Timber``. It is a bit heavy on the LHC side at the moment.

**Please note**: this module requires the ``pytimber`` package to access ``Timber`` functionality,
both of which are only possible from inside the CERN network.

To install ``pytimber`` along ``omc3``, please do so from inside the CERN network by using the [cern] extra
dependency and installing from the ``acc-py`` package index (by specifying ``--index-url
https://acc-py-repo.cern.ch/repository/vr-py-releases/simple`` and
``--trusted-host acc-py-repo.cern.ch`` to your ``pip`` installation command).
"""
import datetime
import re
from contextlib import suppress
from typing import Dict, List, NewType, Sequence, Tuple, Union

import numpy as np
import tfs

# from jpype import JException, java
from omc3.tune_analysis import constants as const
from omc3.utils import logging_tools
from omc3.utils.mock import cern_network_import
from omc3.utils.time_tools import CERNDatetime

TIME_COL = const.get_time_col()
START_TIME = const.get_tstart_head()
END_TIME = const.get_tend_head()

LOG = logging_tools.get_logger(__name__)
pytimber = cern_network_import("pytimber")
jpype = cern_network_import("jpype")

MAX_RETRIES = 10  # number of retries on retryable exception
AcceptableTimeStamp = NewType("AcceptableTimeStamp", Union[CERNDatetime, int, float])


[docs] def lhc_fill_to_tfs( fill_number: int, keys: Sequence[str] = None, names: Dict[str, str] = None ) -> tfs.TfsDataFrame: """ Extracts data for keys of fill from ``Timber``. Args: fill_number (int): Number of the fill to extract from. keys (Sequence[str]): the different variables names to extract data for. names (Dict[str, str): dict mapping keys to column names. Returns: The extracted data as a ``TfsDataFrame``. """ db = pytimber.LoggingDB(source="nxcals") t_start, t_end = get_fill_times(db, fill_number) return extract_between_times(t_start, t_end, keys, names)
[docs] def extract_between_times( t_start: AcceptableTimeStamp, t_end: AcceptableTimeStamp, keys: Sequence[str] = None, names: Dict[str, str] = None, ) -> tfs.TfsDataFrame: """ Extracts data for keys between ``t_start`` and ``t_end`` from ``Timber``. Args: t_start (AcceptableTimeStamp): starting time in CERNDateTime or timestamp. t_end (AcceptableTimeStamp): end time in local CERNDateTime or timestamp. keys (Sequence[str]): the different variables names to extract data for. names (Dict[str, str): dict mapping keys to column names. Returns: Extracted data in a ``TfsDataFrame``. """ with suppress(TypeError): t_start: CERNDatetime = CERNDatetime.from_timestamp(t_start) with suppress(TypeError): t_end: CERNDatetime = CERNDatetime.from_timestamp(t_end) db = pytimber.LoggingDB(source="nxcals") if keys is None: keys = get_tune_and_coupling_variables(db) # Attempt getting data from NXCALS, which can sometimes need a few retries (yay NXCALS) # If Java gives a feign.RetryableException, retry up to MAX_RETRIES times. extract_dict = {} for tries in range(MAX_RETRIES + 1): try: # We use timestamps to avoid any confusion with local time extract_dict = db.get(keys, t_start.timestamp(), t_end.timestamp()) except jpype.java.lang.IllegalStateException as java_state_error: raise IOError( "Could not get data from Timber, user probably has no access to NXCALS" ) from java_state_error except jpype.JException as java_exception: # Might be a case for retries if "RetryableException" in str(java_exception) and (tries + 1) < MAX_RETRIES: LOG.warning(f"Could not get data from Timber! Trial no {tries + 1} / {MAX_RETRIES}") continue # will go to the next iteratoin of the loop, so retry raise IOError("Could not get data from timber!") from java_exception else: break if (not len(extract_dict) # dict is empty or all(not len(v) for v in extract_dict.values()) # values are empty or all(len(v) == 2 and not len(v[0]) for v in extract_dict.values()) # arrays are empty (size 2 for time/data) ): raise IOError(f"Variables {keys} found but no data extracted in time {t_start.utc_string} - {t_end.utc_string} (UTC).\n" f"Possible reasons:\n" f" - Too small time window.\n" f" - Old pytimber version.\n" f" - Variable outdated (i.e. no longer logged).") out_df = tfs.TfsDataFrame() for key in keys: if extract_dict[key][1][0].size > 1: raise NotImplementedError("Multidimensional variables are not implemented yet") data = np.asarray(extract_dict[key]).transpose() column = key if names is None else names.get(key, key) key_df = tfs.TfsDataFrame(data, columns=[TIME_COL, column]).set_index(TIME_COL) out_df = out_df.merge(key_df, how="outer", left_index=True, right_index=True) out_df.index = [CERNDatetime.from_timestamp(i) for i in out_df.index] out_df.headers[START_TIME] = t_start.cern_utc_string() out_df.headers[END_TIME] = t_end.cern_utc_string() return out_df
[docs] def get_tune_and_coupling_variables(db) -> List[str]: """ Returns the tune and coupling variable names. Args: db (pytimber.LoggingDB): pytimber database connexion. Returns: List of variable names as strings. """ bbq_vars = [] for search_term in ["%EIGEN%FREQ%", "%COUPL%ABS%"]: search_results = db.search(search_term) for res in search_results: if re.match(r"LHC\.B(OFSU|QBBQ\.CONTINUOUS)", res): bbq_vars.append(res) return bbq_vars
[docs] def get_fill_times( db, fill_number: int ) -> Tuple[Union[datetime.datetime, float], Union[datetime.datetime, float]]: """ Returns start and end time of fill with fill number. Args: db (pytimber.LoggingDB): pytimber database. fill_number (int): fill number. Returns: `Tuple` of start and end time. """ fill = db.getLHCFillData(fill_number) return fill["startTime"], fill["endTime"]