"""
Module online_model.data_converter
---------------------------------------
Functions to convert the online model output files.
"""
import re
import os
import pytz
from datetime import datetime
import numpy as np
import online_model.constants as const
from tfs_files import tfs_pandas as tfs
from collections import OrderedDict
from utils.contexts import suppress_exception
# Globals for finding data in output file ######################################
PRE_BEAMPROCESS_STRING = "beamprocess" # word before beamprocess name (line 0)
PRE_KNOB_STRING = "knob" # word before knobname (line 1)
PRE_TIME_STRING = "time" # word before time (line 1)
TRIM_VALUE_NAME = r"bbmext\.knob\d*" # matcher for trim value name
OPTIC_NAME = "OPTIC_NAME" # ID for the optic name
FILLNUMBER = "Fillnumber" # Header in orbit file
BEAMPROCESS_HEADER = const.get_beamprocess_header()
TRIM_HEADER = const.get_trim_header()
TRIMHINT_HEADER = const.get_trimhint_header()
TIME_HEADER = const.get_time_header()
KNOB_HEADER = const.get_knob_header()
NAME_COLUMN = const.get_name_column(with_type=True)
CIRCUIT_COLUMN = const.get_circuit_column(with_type=True)
VALUE_COLUMN = const.get_value_column(with_type=True)
DELTA_COLUMN = const.get_delta_column(with_type=True)
TRIM_COLUMN = const.get_trim_column(with_type=True)
COLUMNS_K = OrderedDict([NAME_COLUMN, CIRCUIT_COLUMN, VALUE_COLUMN, DELTA_COLUMN])
RE_K_LINE = r"\s*(?P<NAME>\S+)\s*=\s*(?P<VALUE>[^;]+)[^:]+:\s*(?P<CIRCUIT>[^/]+/[^.]+)\.[^\d+\-]+(?P<DELTA>[^.]+\.[^.]+)\."
RE_TRIM = r"\s*(?P<trimname>" + TRIM_VALUE_NAME + r")\s*=\s*(?P<trimvalue>[^;]+)\s*;"
RE_LOG_ALL_KNOBS = r".+\s*-\s*Knobs\s*\{(.*)\}" # matcher for knobs line in logfile
[docs]def knobs_k_to_tfs(knob_file=const.get_extractor_knobs_filename(), trim=None):
""" Convert from knobs.madx output file to a proper tfs file.
Assumes no one changes the output order. See global variables is this file.
Args:
knob_file: path to online model extractor K-Type output file.
trim: trim value if available.
Returns:
List of TfsDataFrames, one for each knob.
"""
with open(knob_file, "r") as f:
content = f.readlines()
ls = content[0].split()
beamprocess = ls[ls.index(PRE_BEAMPROCESS_STRING) + 1]
del content[0]
df_list = []
while len(content) > 0:
df = tfs.TfsDataFrame(columns=COLUMNS_K.keys())
# headers
df.headers[BEAMPROCESS_HEADER] = beamprocess
ls = content[0].split()
df.headers[KNOB_HEADER] = ls[ls.index(PRE_KNOB_STRING) + 1]
time_idx = ls.index(PRE_TIME_STRING)
df.headers[TIME_HEADER] = convert_local_time_to_utc(
" ".join(ls[time_idx + 1:time_idx+3])[:-1]
)
if trim is not None:
df.headers[TRIM_HEADER] = trim
df.headers[TRIMHINT_HEADER] = "VALUE = BEFORE_VALUE + DELTA * TRIM"
# get data
pattern = re.compile(RE_K_LINE)
idx = 0
for idx, line in enumerate(content[1:]):
if line.startswith("!"):
break # next knob, keep last line
match = re.match(pattern, line)
for col in COLUMNS_K.keys():
df.loc[idx, col] = match.group(col)
else:
idx += 1 # no further knob, delete last line
for col in COLUMNS_K.keys():
with suppress_exception(KeyError):
df[col] = df[col].astype(COLUMNS_K[col])
df_list.append(df)
del content[:idx+1]
return df_list
[docs]def knobs_kvalues_to_tfs(knob_file=const.get_extractor_knobs_filename()):
""" Convert data from KNOBVALUES extraction to tfs file.
Args:
knob_file: path to online model extractor KNOBVALUE-Type output file.
Returns:
TfsDataFrame with trimvalues and additional time an beamprocess in headers.
"""
with open(knob_file, "r") as f:
content = f.readlines()
df = tfs.TfsDataFrame(columns=[NAME_COLUMN[0], TRIM_COLUMN[0]])
ls = content[0].split()
df.headers[BEAMPROCESS_HEADER] = ls[ls.index(PRE_BEAMPROCESS_STRING) + 1]
del content[0]
while len(content) > 0:
# get time
ls = content[0].split()
if TIME_HEADER not in df.headers:
time_idx = ls.index(PRE_TIME_STRING)
df.headers[TIME_HEADER] = convert_local_time_to_utc(
" ".join(ls[time_idx + 1:time_idx+3])[:-1]
)
# get knob name
knob = ls[ls.index(PRE_KNOB_STRING) + 1]
if len(content) == 1:
raise RuntimeError("Error in knob '{:s}':".format(knob) +
"No data found. See log for details.")
# get trim
match = re.match(RE_TRIM, content[1])
if not match:
raise RuntimeError("Error in knob '{:s}':".format(knob) +
"No trim found. See log for details.")
trim = np.float64(match.group("trimvalue"))
# add to dataframe
df.loc[len(df), :] = (knob, trim) # do not use append, it deletes headers
for col, typ in (NAME_COLUMN, TRIM_COLUMN):
df[col] = df[col].astype(typ)
# clear lines
idx = 0
for idx, line in enumerate(content[1:]):
if line.startswith("!"):
break # next knob, keep last line
else:
idx += 1 # no further knob, delete last line
del content[:idx+1]
return df
[docs]def get_optics(optics_file):
""" Extracts Optics from optics file (-e switch in online model extractor). """
with open(optics_file, "r") as f:
content = f.readlines()
for line in content:
ls = line.split()
if ls[1] == OPTIC_NAME:
return ls[2]
raise RuntimeError("No optics found in '{:s}'".format(optics_file))
[docs]def get_fill_from_orbitfile(orbit_file):
""" Extracts the fill number from orbit file (-oe switch in online model extractor).
Args:
orbit_file: Path to orbit file.
"""
return int(tfs.read_tfs(orbit_file).headers[FILLNUMBER])
[docs]def get_knobs_and_trims_from_log(log_file):
""" Extract the knobs line from the log file.
Args:
log_file: Path to log file.
"""
knobs_str = _get_knobs_string_from_log(log_file)
return _get_knobs_and_trims_from_knobs_string(knobs_str)
# Time Functions ###############################################################
[docs]def get_utc_now():
""" Returns current utc time as timeformat """
return datetime.now(tz=pytz.utc).strftime(const.get_time_format())
[docs]def convert_utc_time_to_local(time):
""" Converts UTC time in isoformat to local time """
local = const.get_experiment_timezone()
time = pytz.utc.localize(datetime.strptime(time, const.get_time_format()))
return time.astimezone(local).strftime(const.get_time_format())
[docs]def convert_local_time_to_utc(time):
""" Converts local time in isoformat to UTC time """
local = const.get_experiment_timezone()
time = local.localize(datetime.strptime(time, const.get_time_format()))
return time.astimezone(pytz.utc).strftime(const.get_time_format())
# Private Functions ############################################################
def _get_trim_from_changefile(change_file):
""" Extract the trim value from the change_file """
pattern = re.compile(RE_TRIM)
with open(change_file, "r") as f:
for line in f.readlines():
match = re.match(pattern, line)
if match:
return np.float64(match.group("trimvalue"))
raise StandardError("Trim Value could not be extracted from '{:s}'".format(change_file))
def _get_knobs_string_from_log(log_file):
""" Return string of knobs and values from logfile """
pattern = re.compile(RE_LOG_ALL_KNOBS)
with open(log_file, "r") as log:
content = log.readlines()
for line in reversed(content): # should be one of the latter lines
match = re.match(pattern, line)
if match is not None:
return match.group(1)
raise RuntimeError("No knobs found in log '{:s}'".format(log_file))
def _get_knobs_and_trims_from_knobs_string(knobs_string):
""" Split the knobs string into pieces and get trim values"""
trims = {}
for pair_str in knobs_string.split(","):
knob, value_str = pair_str.strip().split("=")
trims[knob] = eval(value_str.replace(" ", ","))[1]
return trims