"""
Kickgroups
----------
Functions to list KickGroups and show their Kicks.
.. code-block:: none
usage: kickgroups.py [-h] {list,info} ...
KickGroups Functions
optional arguments:
-h, --help show this help message and exit
Functionality:
{list,info}
list List all KickGroups
info Show the info of a given KickGroup
Function ``list``:
.. code-block:: none
usage: kickgroups.py list [-h] [--root ROOT]
[--sort {TIMESTAMP,KICKGROUP}]
List KickGroups
optional arguments:
-h, --help show this help message and exit
--root ROOT KickGroups Root-Directory
--sort {TIMESTAMP,KICKGROUP}
Sort KickGroups
Function ``info``:
.. code-block:: none
usage: kickgroups.py info [-h] [--root ROOT] [--files FILES] group
KickGroup Info
positional arguments:
group KickGroup name
optional arguments:
-h, --help show this help message and exit
--root ROOT KickGroups Root-Directory
--files FILES, -f FILES
Optional integer. If a value is given, only show the path
to *files* SDDS files from the group. Use negative values
to show the last files (most recent kicks), positive
values for the first ones. A value of zero means showing
all files in the group.
"""
import argparse
import json
from datetime import datetime
from pathlib import Path
from typing import List, Union
import numpy as np
import pandas as pd
from dateutil import tz
from omc3.utils import logging_tools
from pandas import DataFrame
from tfs import TfsDataFrame
# fmt: off
from pylhc.constants.kickgroups import (
AMPX,
AMPY,
AMPZ,
BEAM,
BEAMPROCESS,
BUNCH,
COLUMNS_TO_HEADERS,
DRIVEN_TUNEX,
DRIVEN_TUNEY,
DRIVEN_TUNEZ,
KICK_COLUMNS,
KICK_GROUP_COLUMNS,
KICKGROUP,
KICKGROUPS_ROOT,
LOCALTIME,
OPTICS,
OPTICS_URI,
SDDS,
TIMESTAMP,
TUNEX,
TUNEY,
TURNS,
UTCTIME,
)
# fmt: on
LOG = logging_tools.get_logger(__name__)
# List Kickgroups --------------------------------------------------------------
[docs]
def list_available_kickgroups(by: str = TIMESTAMP, root: Union[Path, str] = KICKGROUPS_ROOT, printout: bool = True) -> DataFrame:
"""
List all available KickGroups in `root` with optional sorting..
Args:
by (str): Column to sort the KickGroups by. Should be either the
``TIMESTAMP`` or ``KICKGROUP`` variable.
root (pathlib.Path): Alternative `~pathlib.Path` to the KickGroup folder. (Defaults
to the ``NFS`` path of our kickgroups).
printout (bool): whether to print out the dataframe, defaults to `True`.
Returns:
A `~pandas.DataFrame` with the KickGroups loaded, sorted by the provided
*by* parameter.
"""
LOG.debug(f"Listing KickGroups in '{Path(root).absolute()}'")
kickgroup_paths = get_folder_json_files(root)
df_info = DataFrame(index=range(len(kickgroup_paths)), columns=KICK_GROUP_COLUMNS)
for idx, kick_group in enumerate(kickgroup_paths):
LOG.debug(f"Loading kickgroup info from '{kick_group.absolute()}'")
data = _load_json(kick_group)
df_info.loc[idx, KICKGROUP] = data["groupName"]
df_info.loc[idx, TIMESTAMP] = data["groupCreationTime"]
df_info.loc[idx, UTCTIME] = _ts_to_datetime(df_info.loc[idx, TIMESTAMP])
df_info.loc[idx, LOCALTIME] = _utc_to_local(df_info.loc[idx, UTCTIME])
df_info = df_info.sort_values(by=by).set_index(TIMESTAMP)
if printout:
LOG.debug("Here is information about the loaded KickGroups")
print(df_info.to_string(index=False, formatters=_time_formatters(), justify="center"))
return df_info
[docs]
def get_folder_json_files(root: Union[Path, str] = KICKGROUPS_ROOT) -> List[Path]:
"""Returns a list of all **.json** files in the folder.
Args:
root (Union[Path, str])): the path to the folder. (Defaults
to the ``NFS`` path of our kickgroups).
Returns:
A `list` of `~pathlib.Path` objects to all **json** files within the provided
*root* parameter.
"""
LOG.debug(f"Globing for JSON files in {Path(root).absolute()}''")
return list(Path(root).glob("*.json"))
# Kickgroup Info ---------------------------------------------------------------
[docs]
def get_kickgroup_info(kick_group: str, root: Union[Path, str] = KICKGROUPS_ROOT) -> TfsDataFrame:
"""
Gather all important info about the KickGroup into a `~tfs.TfsDataFrame`.
Args:
kick_group (str): the KickGroup name, corresponds to the kickgroup file name without
the ``.json`` extension.
root (pathlib.Path): Alternative `~pathlib.Path` to the KickGroup folder. (Defaults
to the ``NFS`` path of our kickgroups).
Returns:
A `~tfs.TfsDataFrame` with the KickGroup information loaded.
"""
LOG.debug(f"Loading info from all KickFiles in KickGroup '{kick_group}'")
kick_group_data = _load_json(Path(root) / f"{kick_group}.json")
kicks_files = kick_group_data["jsonFiles"]
df_info = TfsDataFrame(index=range(len(kicks_files)), columns=KICK_COLUMNS, headers={KICKGROUP: kick_group})
if not len(kicks_files):
raise FileNotFoundError(f"KickGroup {kick_group} contains no kicks.")
for idx, kf in enumerate(kicks_files):
df_info.loc[idx, :] = load_kickfile(kf)
for column in COLUMNS_TO_HEADERS:
df_info.headers[column] = df_info[column][0]
return df_info
[docs]
def load_kickfile(kickfile: Union[Path, str]) -> pd.Series:
"""
Load the important data from a **json** kickfile into a `~pandas.Series`.
Args:
kickfile (Union[Path, str]): the path to the kickfile to load data from.
Returns:
A `~pandas.Series` with the relevant information loaded from the provided
*kickfile*. The various entries in the Series are defined in `pylhc.constants.kickgroups`
as ``KICK_COLUMNS``.
"""
LOG.debug(f"Loading kick information from Kickfile at '{Path(kickfile).absolute()}'")
kick = _load_json(kickfile)
data = pd.Series(index=KICK_COLUMNS, dtype=object)
data[LOCALTIME] = _jsontime_to_datetime(kick["acquisitionTime"])
data[UTCTIME] = _local_to_utc(data[LOCALTIME])
data[SDDS] = kick["sddsFile"]
data[BEAM] = kick["measurementEnvironment"]["lhcBeam"]["beamName"]
data[BEAMPROCESS] = kick["measurementEnvironment"]["environmentContext"]["name"]
data[TURNS] = kick["acqSettings"]["capturedTurns"]
data[BUNCH] = kick["acqSettings"]["bunchSelection"]
data[OPTICS] = kick["measurementEnvironment"]["opticsModel"]["opticName"]
data[OPTICS_URI] = kick["measurementEnvironment"]["opticsModel"]["opticModelURI"]
three_d = "3D" in kick["excitationSettings"][0]["type"]
if three_d:
LOG.debug("Kick is 3D Excitation, loading longitudinal kick settings")
idx = _get_plane_index(kick["excitationSettings"][0]["acDipoleSettings"], "X")
idy = _get_plane_index(kick["excitationSettings"][0]["acDipoleSettings"], "Y")
data[TUNEX] = kick["excitationSettings"][0]["acDipoleSettings"][idx]["measuredTune"]
data[TUNEY] = kick["excitationSettings"][0]["acDipoleSettings"][idy]["measuredTune"]
data[DRIVEN_TUNEX] = data[TUNEX] + kick["excitationSettings"][0]["acDipoleSettings"][idx]["deltaTuneStart"]
data[DRIVEN_TUNEY] = data[TUNEY] + kick["excitationSettings"][0]["acDipoleSettings"][idy]["deltaTuneStart"]
data[DRIVEN_TUNEZ] = kick["excitationData"][0]["rfdata"]["excitationFrequency"]
data[AMPX] = kick["excitationSettings"][0]["acDipoleSettings"][idx]["amplitude"]
data[AMPY] = kick["excitationSettings"][0]["acDipoleSettings"][idy]["amplitude"]
data[AMPZ] = kick["excitationSettings"][0]["longitudinalRfSettings"]["excitationAmplitude"]
else:
LOG.debug("Kick is 2D Excitation, longitudinal settings will be set as NaNs")
idx = _get_plane_index(kick["excitationSettings"], "X")
idy = _get_plane_index(kick["excitationSettings"], "Y")
data[TUNEX] = kick["excitationSettings"][idx]["measuredTune"]
data[TUNEY] = kick["excitationSettings"][idy]["measuredTune"]
data[DRIVEN_TUNEX] = data[TUNEX] + kick["excitationSettings"][idx]["deltaTuneStart"]
data[DRIVEN_TUNEY] = data[TUNEY] + kick["excitationSettings"][idy]["deltaTuneStart"]
data[DRIVEN_TUNEZ] = np.NaN
data[AMPX] = kick["excitationSettings"][idx]["amplitude"]
data[AMPY] = kick["excitationSettings"][idy]["amplitude"]
data[AMPZ] = np.NaN
return data
# Functions with console output ---
# Full Info -
[docs]
def show_kickgroup_info(kick_group: str, root: Union[Path, str] = KICKGROUPS_ROOT) -> None:
"""
Wrapper around `~pylhc.kickgroups.get_kickgroup_info`, gathering the relevant
information from the kick files in the group and printing it to console.
Args:
kick_group (str): the KickGroup name, corresponds to the kickgroup file name without
the ``.json`` extension.
root (pathlib.Path): Alternative `~pathlib.Path` to the KickGroup folder. (Defaults
to the ``NFS`` path of our kickgroups).
"""
kicks_info = get_kickgroup_info(kick_group, root)
_print_kickgroup_info(kicks_info)
def _print_kickgroup_info(kicks_info: TfsDataFrame) -> None:
"""
Print the full info about the kickgroup.
Args:
kicks_info (TfsDataFrame): Gathered Kickgroup data.
"""
for header, value in kicks_info.headers.items():
print(f"{header}: {value}")
print()
print(
kicks_info.drop(columns=COLUMNS_TO_HEADERS).to_string(
index=False, na_rep=" - ", justify="center", formatters=_time_formatters()
)
)
# Files only -
[docs]
def show_kickgroup_files(kick_group: str, nfiles: int = None, root: Union[Path, str] = KICKGROUPS_ROOT) -> None:
"""
Wrapper around `pylhc.kickgroups.get_kickgroup_info`, gathering the relevant
information from all kickfiles in the KickGroup and printing only the sdds-filepaths
to console.
Args:
kick_group (str): the KickGroup name, corresponds to the kickgroup file name without
the ``.json`` extension.
nfiles (int): Number of files to show. Use negative values for the last nfiles.
A value of zero or None means all files in the group.
root (pathlib.Path): Alternative `~pathlib.Path` to the KickGroup folder. (Defaults
to the ``NFS`` path of our kickgroups).
"""
kicks_info = get_kickgroup_info(kick_group, root)
_print_kickgroup_files(kicks_info, nfiles=nfiles)
def _print_kickgroup_files(kicks_info: TfsDataFrame, nfiles: int = None) -> None:
"""
Print *nfiles* from the KickGroup as space-separated quoted strings, which can
then be directly copy-pasted into the GUI to load them at once.
Args:
kicks_info (TfsDataFrame): A `~tfs.TfsDataFrame` with the gathered KickGroup data.
nfiles (int): Number of files to show. Use negative values for the last nfiles.
A value of zero or `None` means all files in the group.
"""
kickgroup = kicks_info.headers[KICKGROUP]
nfiles_total = len(kicks_info.index)
nfiles_str = "all files"
if nfiles is None or nfiles == 0:
element_slice = slice(None, None)
else:
nfiles_str = f"{'last ' if nfiles < 0 else ''}{abs(nfiles)} file(s)"
if abs(nfiles) > nfiles_total:
LOG.warning(
f"You requested a total of {abs(nfiles)} files to print"
f" but there are only {nfiles_total} kicks in {kickgroup}."
)
nfiles = nfiles_total
element_slice = slice(nfiles) if nfiles > 0 else slice(nfiles, None)
print(f"Kickgroup {kicks_info.headers[KICKGROUP]}, {nfiles_str}:")
print(" ".join([f'"{s}"' for s in kicks_info[SDDS][element_slice]]))
# Helper -----------------------------------------------------------------------
# IO ---
def _load_json(jsonfile: Union[Path, str]) -> dict:
return json.loads(Path(jsonfile).read_text())
# Time ---
def _ts_to_datetime(ts: int) -> datetime:
return datetime.utcfromtimestamp(ts / 1000)
def _jsontime_to_datetime(time_str: str) -> datetime:
return datetime.strptime(time_str, "%d-%m-%y_%H-%M-%S")
def _datetime_to_string(dt: datetime):
return dt.strftime(" %Y-%m-%d %H:%M:%S")
def _time_formatters():
return {UTCTIME: _datetime_to_string, LOCALTIME: _datetime_to_string}
def _utc_to_local(dt: datetime):
return dt.replace(tzinfo=tz.gettz("UTC")).astimezone(tz.gettz("Europe/Paris"))
def _local_to_utc(dt: datetime):
return dt.replace(tzinfo=tz.gettz("Europe/Paris")).astimezone(tz.gettz("UTC"))
# Other ---
def _get_plane_index(data: List[dict], plane: str) -> str:
"""
Find the index for the given plane in the data list.
This is necessary as they are not always in X,Y order.
"""
name = {"X": "HORIZONTAL", "Y": "VERTICAL"}[plane]
for idx, entry in enumerate(data):
if entry["plane"] == name:
return idx
else:
raise ValueError(f"Plane '{plane}' not found in data.")
# Script Mode ------------------------------------------------------------------
def _get_args():
"""Parse Commandline Arguments."""
# argparse is a bit weird: you need to create the normal parser
# AND a parent parser if you want to use the same arg in both subparsers.
# Using the main parser also as parent will result in `function` being
# always `None`.
parser = argparse.ArgumentParser(description="KickGroups Functions")
parent_parser = argparse.ArgumentParser()
parent_parser.add_argument(
"--root",
type=str,
required=False,
default=KICKGROUPS_ROOT,
dest="root",
help="KickGroups Root-Directory",
)
subparsers = parser.add_subparsers(
title="Functionality",
dest="function",
required=True,
)
# ----- Full KickGroup Parser ----- #
parser_kickgroups = subparsers.add_parser(
"list",
parents=[parent_parser],
add_help=False,
description="List KickGroups",
help="List all KickGroups",
)
parser_kickgroups.add_argument(
"--sort",
type=str,
dest="sort",
help="Sort KickGroups",
choices=[TIMESTAMP, KICKGROUP],
default=TIMESTAMP,
)
# ---- KickGroup Info Parser ---- #
parser_info = subparsers.add_parser(
"info",
parents=[parent_parser],
add_help=False,
description="KickGroup Info",
help="Show the info of a given KickGroup",
)
parser_info.add_argument(
"group",
type=str,
help="KickGroup name",
)
parser_info.add_argument(
"--files",
"-f",
dest="files",
type=int,
nargs="?",
help="Optional integer. If a value is given, only show the path to *files* SDDS files from the group. "
"Use negative values to show the last files (most recent kicks), positive values for the first ones. "
"A value of zero means showing all files in the group.",
)
return parser.parse_args()
if __name__ == "__main__":
options = _get_args()
if options.function == "list":
list_available_kickgroups(by=options.sort, root=options.root)
if options.function == "info":
if options.files is None:
show_kickgroup_info(kick_group=options.group, root=options.root)
else:
show_kickgroup_files(kick_group=options.group, root=options.root, nfiles=options.files)