Source code for model.accelerators.lhc

from __future__ import print_function

import json
import os
from collections import OrderedDict

import pandas as pd

from model.accelerators.accelerator import Accelerator, AcceleratorDefinitionError, Element, AccExcitationMode, \
    get_commonbpm
from utils import logging_tools
from tfs_files import tfs_pandas
from utils.entrypoint import EntryPoint, EntryPointParameters, split_arguments

LOGGER = logging_tools.get_logger(__name__)
CURRENT_DIR = os.path.dirname(__file__)
LHC_DIR = os.path.join(CURRENT_DIR, "lhc")


def get_lhc_modes():
    return {
        "lhc_runI": LhcRunI,
        "lhc_runII": LhcRunII2015,
        "lhc_runII_2016": LhcRunII2016,
        "lhc_runII_2016_ats": LhcRunII2016Ats,
        "lhc_runII_2017": LhcRunII2017,
        "lhc_runII_2018": LhcRunII2018,
        "hllhc10": HlLhc10,
        "hllhc12": HlLhc12,
        "hllhc13": HlLhc13,
    }


[docs]class Lhc(Accelerator): """ Parent Class for Lhc-Types. Keyword Args: Required nat_tune_x (float): Natural tune X without integer part. **Flags**: ['--nattunex'] nat_tune_y (float): Natural tune Y without integer part. **Flags**: ['--nattuney'] optics (str): Path to the optics file to use (modifiers file). **Flags**: ['--optics'] Optional acd (bool): Activate excitation with ACD. **Flags**: ['--acd'] **Default**: ``False`` adt (bool): Activate excitation with ADT. **Flags**: ['--adt'] **Default**: ``False`` dpp (float or list): Delta p/p to use. **Flags**: ['--dpp'] **Default**: ``0.0`` drv_tune_x (float): Driven tune X without integer part. **Flags**: ['--drvtunex'] drv_tune_y (float): Driven tune Y without integer part. **Flags**: ['--drvtuney'] energy (float): Energy in Tev. **Flags**: ['--energy'] fullresponse (bool): If True, fullresponse template will be filled and put in the output directory. **Flags**: ['--fullresponse'] **Default**: ``False`` xing (bool): If True, x-ing angles will be applied to model **Flags**: ['--xing'] **Default**: ``False`` """ NAME = "lhc" MACROS_NAME = "lhc" @staticmethod def get_class_parameters(): params = EntryPointParameters() params.add_parameter( flags=["--lhcmode"], help=("LHC mode to use. Should be one of: " + str(get_lhc_modes().keys())), name="lhc_mode", type=str, choices=get_lhc_modes().keys() ) params.add_parameter( flags=["--beam"], help="Beam to use.", name="beam", type=int, ) return params @staticmethod def get_instance_parameters(): params = EntryPointParameters() params.add_parameter( flags=["--model_dir", "-m"], help="Path to model directory (loads tunes and excitation from model!).", name="model_dir", type=str, ) params.add_parameter( flags=["--nattunex"], help="Natural tune X without integer part.", name="nat_tune_x", type=float, ) params.add_parameter( flags=["--nattuney"], help="Natural tune Y without integer part.", name="nat_tune_y", type=float, ) params.add_parameter( flags=["--acd"], help="Activate excitation with ACD.", name="acd", action="store_true" ) params.add_parameter( flags=["--adt"], help="Activate excitation with ADT.", name="adt", action="store_true", ) params.add_parameter( flags=["--drvtunex"], help="Driven tune X without integer part.", name="drv_tune_x", type=float, ) params.add_parameter( flags=["--drvtuney"], help="Driven tune Y without integer part.", name="drv_tune_y", type=float, ) params.add_parameter( flags=["--dpp"], help="Delta p/p to use.", name="dpp", default=0.0, type=float, ) params.add_parameter( flags=["--energy"], help="Energy in Tev.", name="energy", type=float, ) params.add_parameter( flags=["--optics"], help="Path to the optics file to use (modifiers file).", name="optics", type=str, ) params.add_parameter( flags=["--fullresponse"], help=("If True, fullresponse template will " "be filled and put in the output directory."), name="fullresponse", action="store_true", ) params.add_parameter( flags=["--xing"], help="If True, x-ing angles will be applied to model", name="xing", action="store_true", ) return params # Entry-Point Wrappers ##################################################### def __init__(self, *args, **kwargs): # for reasons of import-order and class creation, decoration was not possible parser = EntryPoint(self.get_instance_parameters(), strict=True) opt = parser.parse(*args, **kwargs) if opt.model_dir: self.init_from_model_dir(opt.model_dir) self.energy = None self.dpp = 0.0 self.xing = None if opt.nat_tune_x is not None: raise AcceleratorDefinitionError( "Argument 'nat_tune_x' not allowed when loading from model directory." ) if opt.nat_tune_y is not None: raise AcceleratorDefinitionError( "Argument 'nat_tune_y' not allowed when loading from model directory." ) if opt.drv_tune_x is not None: raise AcceleratorDefinitionError( "Argument 'drv_tune_x' not allowed when loading from model directory." ) if opt.drv_tune_y is not None: raise AcceleratorDefinitionError( "Argument 'drv_tune_y' not allowed when loading from model directory." ) else: if opt.nat_tune_x is None: raise AcceleratorDefinitionError("Argument 'nat_tune_x' is required.") if opt.nat_tune_y is None: raise AcceleratorDefinitionError("Argument 'nat_tune_y' is required.") self.nat_tune_x = opt.nat_tune_x self.nat_tune_y = opt.nat_tune_y self.drv_tune_x = None self.drv_tune_y = None self._excitation = AccExcitationMode.FREE if opt.acd or opt.adt: if opt.acd and opt.adt: raise AcceleratorDefinitionError( "Select only one excitation type." ) if opt.drv_tune_x is None: raise AcceleratorDefinitionError("Argument 'drv_tune_x' is required.") if opt.drv_tune_y is None: raise AcceleratorDefinitionError("Argument 'drv_tune_x' is required.") self.drv_tune_x = opt.drv_tune_x self.drv_tune_y = opt.drv_tune_y if opt.acd: self._excitation = AccExcitationMode.ACD elif opt.adt: self._excitation = AccExcitationMode.ADT # optional with default self.dpp = opt.dpp self.fullresponse = opt.fullresponse # optional no default self.energy = opt.get("energy", None) self.xing = opt.get("xing", None) self.optics_file = opt.get("optics", None) # for GetLLM self.model_dir = None self._model = None self._model_driven = None self._model_best_knowledge = None self._elements = None self._elements_centre = None self._errordefspath = None self.verify_object() def init_from_model_dir(self, model_dir): LOGGER.debug("Creating accelerator instance from model dir") self.model_dir = model_dir LOGGER.debug(" model path = " + os.path.join(model_dir, "twiss.dat")) try: self._model = tfs_pandas.read_tfs( os.path.join(model_dir, "twiss.dat"), index="NAME") except IOError: self._model = tfs_pandas.read_tfs( os.path.join(model_dir, "twiss_elements.dat"), index="NAME") bpm_index = [idx for idx in self._model.index.values if idx.startswith("B")] self._model = self._model.loc[bpm_index, :] self.nat_tune_x = float(self._model.headers["Q1"]) self.nat_tune_y = float(self._model.headers["Q2"]) # Excitations ##################################### self._model_driven = None self.drv_tune_x = None self.drv_tune_y = None self._excitation = AccExcitationMode.FREE ac_filename = os.path.join(model_dir, "twiss_ac.dat") adt_filename = os.path.join(model_dir, "twiss_adt.dat") if os.path.isfile(ac_filename): self._model_driven = tfs_pandas.read_tfs(ac_filename, index="NAME") self._excitation = AccExcitationMode.ACD if os.path.isfile(adt_filename): if self._excitation == AccExcitationMode.ACD: raise AcceleratorDefinitionError("ADT as well as ACD models provided." "Please choose only one.") self._model_driven = tfs_pandas.read_tfs(adt_filename, index="NAME") self._excitation = AccExcitationMode.ADT if not self._excitation == AccExcitationMode.FREE: self.drv_tune_x = float(self.get_driven_tfs().headers["Q1"]) self.drv_tune_y = float(self.get_driven_tfs().headers["Q2"]) # Best Knowledge ##################################### self._model_best_knowledge = None best_knowledge_path = os.path.join(model_dir, "twiss_best_knowledge.dat") if os.path.isfile(best_knowledge_path): self._model_best_knowledge = tfs_pandas.read_tfs(best_knowledge_path, index="NAME") # Elements ##################################### elements_path = os.path.join(model_dir, "twiss_elements.dat") if os.path.isfile(elements_path): self._elements = tfs_pandas.read_tfs(elements_path, index="NAME") else: raise AcceleratorDefinitionError("Elements twiss not found") center_path = os.path.join(model_dir, "twiss_elements_centre.dat") if os.path.isfile(center_path): self._elements_centre = tfs_pandas.read_tfs(center_path, index="NAME") else: self._elements_centre = self._elements # Optics File ######################################### self.optics_file = None opticsfilepath = os.path.join(self.model_dir, "modifiers.madx") if os.path.exists(opticsfilepath): self.optics_file = opticsfilepath # Error Def ##################################### self._errordefspath = None errordefspath = os.path.join(self.model_dir, "error_deff.txt") if os.path.exists(errordefspath): self._errordefspath = errordefspath else: # until we have a proper file name convention errordefspath = os.path.join(self.model_dir, "error_deffs.txt") if os.path.exists(errordefspath): self._errordefspath = errordefspath
[docs] @classmethod def init_and_get_unknowns(cls, args=None): """ Initializes but also returns unknowns. For the desired philosophy of returning parameters all the time, try to avoid this function, e.g. parse outside parameters first. """ opt, rest_args = split_arguments(args, cls.get_instance_parameters()) return cls(opt), rest_args
[docs] @classmethod def get_class(cls, *args, **kwargs): """ Returns LHC subclass . Keyword Args: Optional beam (int): Beam to use. **Flags**: ['--beam'] lhc_mode (str): LHC mode to use. **Flags**: ['--lhcmode'] **Choices**: ['lhc_runII_2016_ats', 'hllhc12', 'hllhc10', 'lhc_runI', 'lhc_runII', 'lhc_runII_2016', 'lhc_runII_2017'] Returns: Lhc subclass. """ parser = EntryPoint(cls.get_class_parameters(), strict=True) opt = parser.parse(*args, **kwargs) return cls._get_class(opt)
[docs] @classmethod def get_class_and_unknown(cls, *args, **kwargs): """ Returns LHC subclass and unkown args . For the desired philosophy of returning parameters all the time, try to avoid this function, e.g. parse outside parameters first. """ parser = EntryPoint(cls.get_class_parameters(), strict=False) opt, unknown_opt = parser.parse(*args, **kwargs) return cls._get_class(opt), unknown_opt
@classmethod def _get_class(cls, opt): """ Actual get_class function """ new_class = cls if opt.lhc_mode is not None: new_class = get_lhc_modes()[opt.lhc_mode] if opt.beam is not None: new_class = cls._get_beamed_class(new_class, opt.beam) return new_class # Public Methods ########################################################## @classmethod def get_segment(cls, label, first_elem, last_elem, optics_file, twiss_file): segment_cls = type(cls.__name__ + "Segment", (_LhcSegmentMixin, cls), {}) segment_inst = segment_cls() beam = cls.get_beam() bpms_file_name = "beam1bpms.tfs" if beam == 1 else "beam2bpms.tfs" bpms_file = _get_file_for_year(cls.YEAR, bpms_file_name) bpms_file_data = tfs_pandas.read_tfs(bpms_file).set_index("NAME") first_elem_s = bpms_file_data.loc[first_elem, "S"] last_elem_s = bpms_file_data.loc[last_elem, "S"] segment_inst.label = label segment_inst.start = Element(first_elem, first_elem_s) segment_inst.end = Element(last_elem, last_elem_s) segment_inst.optics_file = optics_file segment_inst.xing = False segment_inst.fullresponse = False segment_inst.kind = '' # '' means beta from phase, can be 'betaamp', in the future 'betakmod' segment_inst.verify_object() return segment_inst @classmethod def _get_beamed_class(cls, new_class, beam): beam_mixin = _LhcB1Mixin if beam == 1 else _LhcB2Mixin beamed_class = type(new_class.__name__ + "B" + str(beam), (new_class, beam_mixin), {}) return beamed_class
[docs] def verify_object(self): # TODO: Maybe more checks? """Verifies if everything is defined which should be defined """ LOGGER.debug("Accelerator class verification") try: self.get_beam() except AttributeError: raise AcceleratorDefinitionError( "The accelerator definition is incomplete, beam " "has to be specified (--beam option missing?)." ) if self.model_dir is None: # is the class is used to create full response? if self.optics_file is None: raise AcceleratorDefinitionError( "The accelerator definition is incomplete, optics " "file or model directory has not been specified." ) if self.xing is None: raise AcceleratorDefinitionError("Crossing on or off not set.") if self.excitation is None: raise AcceleratorDefinitionError("Excitation mode not set.") if (self.excitation == AccExcitationMode.ACD or self.excitation == AccExcitationMode.ADT): if self.drv_tune_x is None or self.drv_tune_y is None: raise AcceleratorDefinitionError("Driven tunes not set.") if self.optics_file is not None and not os.path.exists(self.optics_file): raise AcceleratorDefinitionError( "Optics file '{:s}' does not exist.".format(self.optics_file)) # print info about the accelerator # TODO: write more output prints LOGGER.debug( "... verification passed. Will now print some information about the accelerator") LOGGER.debug("{:32s} {}".format("class name", self.__class__.__name__)) LOGGER.debug("{:32s} {}".format("beam", self.get_beam())) LOGGER.debug("{:32s} {}".format("beam direction", self.get_beam_direction())) LOGGER.debug("")
[docs] @classmethod def get_nominal_tmpl(cls): return cls.get_file("nominal.madx")
@classmethod def get_nominal_multidpp_tmpl(cls): return cls.get_file("nominal_multidpp.madx") @classmethod def get_coupling_tmpl(cls): return cls.get_file("coupling_correct.madx") @classmethod def get_best_knowledge_tmpl(cls): return cls.get_file("best_knowledge.madx") @classmethod def get_segment_tmpl(cls): return cls.get_file("segment.madx")
[docs] @classmethod def get_iteration_tmpl(cls): return cls.get_file("template.iterate.madx")
@classmethod def get_basic_seq_tmpl(cls): return cls.get_file("template.basic_seq.madx") @classmethod def get_update_correction_tmpl(cls): return cls.get_file("template.update_correction.madx") @classmethod def get_file(cls, filename): return os.path.join(CURRENT_DIR, "lhc", filename) @classmethod def get_sequence_file(cls): try: return _get_file_for_year(cls.YEAR, "main.seq") except AttributeError: raise AcceleratorDefinitionError( "The accelerator definition is incomplete, mode " + "has to be specified (--lhcmode option missing?)." )
[docs] @classmethod def get_variables(cls, frm=None, to=None, classes=None): correctors_dir = os.path.join(LHC_DIR, "2012", "correctors") all_corrs = _merge_jsons( os.path.join(correctors_dir, "correctors_b" + str(cls.get_beam()), "beta_correctors.json"), os.path.join(correctors_dir, "correctors_b" + str(cls.get_beam()), "coupling_correctors.json"), cls._get_triplet_correctors_file(), ) my_classes = classes if my_classes is None: my_classes = all_corrs.keys() vars_by_class = set(_flatten_list( [all_corrs[corr_cls] for corr_cls in my_classes if corr_cls in all_corrs]) ) if frm is None and to is None: return list(vars_by_class) elems_matrix = tfs_pandas.read_tfs( cls._get_corrector_elems() ).sort_values("S") if frm is not None and to is not None: if frm > to: elems_matrix = elems_matrix[(elems_matrix.S >= frm) | (elems_matrix.S <= to)] else: elems_matrix = elems_matrix[(elems_matrix.S >= frm) & (elems_matrix.S <= to)] elif frm is not None: elems_matrix = elems_matrix[elems_matrix.S >= frm] elif to is not None: elems_matrix = elems_matrix[elems_matrix.S <= to] vars_by_position = _remove_dups_keep_order(_flatten_list( [raw_vars.split(",") for raw_vars in elems_matrix.loc[:, "VARS"]] )) return _list_intersect_keep_order(vars_by_position, vars_by_class)
[docs] def get_update_correction_job(self, tiwss_out_path, corrections_file_path): """ Return string for madx job of correcting model """ with open(self.get_update_correction_tmpl(), "r") as template: madx_template = template.read() try: replace_dict = { "LIB": self.MACROS_NAME, "MAIN_SEQ": self.load_main_seq_madx(), "OPTICS_PATH": self.optics_file, "CROSSING_ON": "1" if self.xing else "0", "NUM_BEAM": self.get_beam(), "DPP": self.dpp, "QMX": self.nat_tune_x, "QMY": self.nat_tune_y, "PATH_TWISS": tiwss_out_path, "CORRECTIONS": corrections_file_path, } except AttributeError: raise AcceleratorDefinitionError( "The accelerator definition is incomplete. " + "Needs to be an accelator instance. Also: --lhcmode or --beam option missing?" ) return madx_template % replace_dict
[docs] def get_basic_seq_job(self): """ Return string for madx job of correting model """ with open(self.get_basic_seq_tmpl(), "r") as template: madx_template = template.read() try: replace_dict = { "LIB": self.MACROS_NAME, "MAIN_SEQ": self.load_main_seq_madx(), "OPTICS_PATH": self.optics_file, "CROSSING_ON": "1" if self.xing else "0", "NUM_BEAM": self.get_beam(), "DPP": self.dpp, "QMX": self.nat_tune_x, "QMY": self.nat_tune_y, } except AttributeError: raise AcceleratorDefinitionError( "The accelerator definition is incomplete. " + "Needs to be an accelator instance. Also: --lhcmode or --beam option missing?" ) return madx_template % replace_dict
[docs] def get_multi_dpp_job(self, dpp_list): """ Return madx job to create twisses (models) with dpps from dpp_list """ with open(self.get_nominal_multidpp_tmpl()) as textfile: madx_template = textfile.read() try: output_path = self.model_dir use_acd = "1" if (self.excitation == AccExcitationMode.ACD) else "0" use_adt = "1" if (self.excitation == AccExcitationMode.ADT) else "0" crossing_on = "1" if self.xing else "0" beam = self.get_beam() replace_dict = { "LIB": self.MACROS_NAME, "MAIN_SEQ": self.load_main_seq_madx(), "OPTICS_PATH": self.optics_file, "NUM_BEAM": beam, "PATH": output_path, "QMX": self.nat_tune_x, "QMY": self.nat_tune_y, "USE_ACD": use_acd, "USE_ADT": use_adt, "CROSSING_ON": crossing_on, "QX": "", "QY": "", "QDX": "", "QDY": "", "DPP": "", "DPP_ELEMS": "", "DPP_AC": "", "DPP_ADT": "", } if (self.excitation in (AccExcitationMode.ACD, AccExcitationMode.ADT)): replace_dict["QX"] = self.nat_tune_x replace_dict["QY"] = self.nat_tune_y replace_dict["QDX"] = self.drv_tune_x replace_dict["QDY"] = self.drv_tune_y except AttributeError: raise AcceleratorDefinitionError( "The accelerator definition is incomplete. " + "Needs to be an accelator instance. Also: --lhcmode or --beam option missing?" ) # add different dpp twiss-command lines twisses_tmpl = "twiss, chrom, sequence=LHCB{beam:d}, deltap={dpp:f}, file='{twiss:s}';\n" for dpp in dpp_list: replace_dict["DPP"] += twisses_tmpl.format( beam=beam, dpp=dpp, twiss=os.path.join(output_path, "twiss_{:f}.dat".format(dpp)) ) replace_dict["DPP_ELEMS"] += twisses_tmpl.format( beam=beam, dpp=dpp, twiss=os.path.join(output_path, "twiss_{:f}_elements.dat".format(dpp)) ) replace_dict["DPP_AC"] += twisses_tmpl.format( beam=beam, dpp=dpp, twiss=os.path.join(output_path, "twiss_{:f}_ac.dat".format(dpp)) ) replace_dict["DPP_ADT"] += twisses_tmpl.format( beam=beam, dpp=dpp, twiss=os.path.join(output_path, "twiss_{:f}_adt.dat".format(dpp)) ) return madx_template % replace_dict
LHC_IPS = ("1", "2", "5", "8") NORMAL_IP_BPMS = "BPMSW.1{side}{ip}.B{beam}" DOROS_IP_BPMS = "LHC.BPM.1{side}{ip}.B{beam}_DOROS"
[docs] @classmethod def get_ips(cls): """ Returns an iterable with this accelerator IPs. Returns: An iterator returning tuples with: ("ip name", "left BPM name", "right BPM name") """ beam = cls.get_beam() for ip in Lhc.LHC_IPS: yield ("IP{}".format(ip), Lhc.NORMAL_IP_BPMS.format(side="L", ip=ip, beam=beam), Lhc.NORMAL_IP_BPMS.format(side="R", ip=ip, beam=beam)) yield ("IP{}_DOROS".format(ip), Lhc.DOROS_IP_BPMS.format(side="L", ip=ip, beam=beam), Lhc.DOROS_IP_BPMS.format(side="R", ip=ip, beam=beam))
def log_status(self): LOGGER.info(" model dir = " + self.model_dir) LOGGER.info("{:20s} [{:10.3f}]".format("Natural Tune X", self.nat_tune_x)) LOGGER.info("{:20s} [{:10.3f}]".format("Natural Tune Y", self.nat_tune_y)) if self._model_best_knowledge is None: LOGGER.info("{:20s} [{:>10s}]".format("Best Knowledge Model", "NO")) else: LOGGER.info("{:20s} [{:>10s}]".format("Best Knowledge Model", "OK")) if self._excitation == AccExcitationMode.FREE: LOGGER.info("{:20s} [{:>10s}]".format("Excitation", "NO")) else: if self._excitation == AccExcitationMode.ACD: LOGGER.info("{:20s} [{:>10s}]".format("Excitation", "ACD")) elif self._excitation == AccExcitationMode.ADT: LOGGER.info("{:20s} [{:>10s}]".format("Excitation", "ADT")) LOGGER.info("{:20s} [{:10.3f}]".format("> Driven Tune X", self.drv_tune_x)) LOGGER.info("{:20s} [{:10.3f}]".format("> Driven Tune Y", self.drv_tune_y)) @classmethod def load_main_seq_madx(cls): try: return _get_call_main_for_year(cls.YEAR) except AttributeError: raise AcceleratorDefinitionError( "The accelerator definition is incomplete, mode " + "has to be specified (--lhcmode option missing?)." ) # Private Methods ########################################################## @classmethod def _get_triplet_correctors_file(cls): correctors_dir = os.path.join(LHC_DIR, "2012", "correctors") return os.path.join(correctors_dir, "triplet_correctors.json") @classmethod def _get_corrector_elems(cls): correctors_dir = os.path.join(LHC_DIR, "2012", "correctors") return os.path.join(correctors_dir, "corrector_elems_b" + str(cls.get_beam()) + ".tfs") @property def excitation(self): return self._excitation @excitation.setter def excitation(self, excitation_mode): if excitation_mode not in (AccExcitationMode.FREE, AccExcitationMode.ACD, AccExcitationMode.ADT): raise ValueError("Wrong excitation mode.") self._excitation = excitation_mode # For GetLLM --------------------------------------------------------------
[docs] def get_exciter_bpm(self, plane, commonbpms): if self.get_beam() == 1: if self.excitation == AccExcitationMode.ACD: return get_commonbpm( "BPMYB.6L4.B1", "BPM.7L4.B1", commonbpms), "MKQA.6L4.B1" elif self.excitation == AccExcitationMode.ADT: if plane == "X": return get_commonbpm( "BPMWA.B5L4.B1", "BPMWA.A5L4.B1", commonbpms), "ADTKH.C5L4.B1" elif plane == "Y": return get_commonbpm( "BPMWA.B5R4.B1", "BPMWA.A5R4.B1", commonbpms), "ADTKV.B5R4.B1" elif self.get_beam() == 2: if self.excitation == AccExcitationMode.ACD: return get_commonbpm( "BPMYA.6L4.B2", "BPM.7L4.B2", commonbpms), "MKQA.6L4.B2" elif self.excitation == AccExcitationMode.ADT: if plane == "X": return get_commonbpm( "BPMWA.B5R4.B2", "BPMWA.A5R4.B2", commonbpms), "ADTKH.C5R4.B2" elif plane == "Y": return get_commonbpm( "BPMWA.B5L4.B2", "BPMWA.A5L4.B2", commonbpms), "ADTKV.B5L4.B2" return None
def get_important_phase_advances(self): if self.get_beam() == 2: return[["MKD.O5R6.B2", "TCTPH.4R1.B2"], ["MKD.O5R6.B2", "TCTPH.4R5.B2"]] if self.get_beam() == 1: return [["MKD.O5L6.B1", "TCTPH.4L1.B1"], ["MKD.O5L6.B1", "TCTPH.4L5.B1"]]
[docs] def get_exciter_name(self, plane): if self.get_beam() == 1: if self.excitation == AccExcitationMode.ACD: if plane == "H": return 'MKQA.6L4.B1' elif plane == "V": return 'MKQA.6L4.B1' elif self.excitation == AccExcitationMode.ADT: if plane == "H": return "ADTKH.C5L4.B1" elif plane == "V": return "ADTKV.B5R4.B1" elif self.get_beam() == 2: if self.excitation == AccExcitationMode.ACD: if plane == "H": return 'MKQA.6L4.B2' elif plane == "V": return 'MKQA.6L4.B2' elif self.excitation == AccExcitationMode.ADT: if plane == "H": return "ADTKH.B5R4.B2" elif plane == "V": return "ADTKV.C5L4.B2" return None
[docs] def get_s_first_BPM(self): if self.get_beam() == 1: return self._model.loc["BPMSW.1L2.B1", "S"] elif self.get_beam() == 2: return self._model.loc["BPMSW.1L8.B2", "S"] return None
[docs] def get_errordefspath(self): """Returns the path to the uncertainty definitions file (formerly called error definitions file. """ if self._errordefspath is None: raise AttributeError("No error definitions file given in this accelerator instance.") return self._errordefspath
def set_errordefspath(self, path): self._errordefspath = path
[docs] def get_k_first_BPM(self, index): if self.get_beam() == 1: model_k = self._model.index.get_loc("BPMSW.1L2.B1") while model_k < len(self._model.index): kname = self._model.index[model_k] if kname in index: return index.get_loc(kname) model_k = model_k + 1 elif self.get_beam() == 2: model_k = self._model.index.get_loc("BPMSW.1L8.B2") while model_k < len(self._model.index): kname = self._model.index[model_k] if kname in index: return index.get_loc(kname) model_k = model_k + 1 return None
def get_synch_BPMs(self, index): # expect passing index.values if self.get_beam() == 1: return [i in index for i in self.model_tfs.loc["BPMSW.33L2.B1":].index] elif self.get_beam() == 2: return [i in index for i in self.model_tfs.loc["BPMSW.33R8.B2":].index]
[docs] def get_model_tfs(self): return self._model
[docs] def get_driven_tfs(self): if self._model_driven is None: raise AttributeError("No driven model given in this accelerator instance.") return self._model_driven
[docs] def get_best_knowledge_model_tfs(self): if self._model_best_knowledge is None: raise AttributeError("No best knowledge model given in this accelerator instance.") return self._model_best_knowledge
[docs] def get_elements_tfs(self): return self._elements
def get_elements_centre_tfs(self): return self._elements_centre
[docs] @classmethod def get_element_types_mask(cls, list_of_elements, types): """ Return boolean mask for elements in list_of_elements that belong to any of the specified types. Needs to handle: "bpm", "magnet", "arc_bpm" Args: list_of_elements: List of elements types: Kinds of elements to look for Returns: Boolean array of elements of specified kinds. """ re_dict = { "bpm": r"BPM", "magnet": r"M", "arc_bpm": r"BPM.*\.0*(1[5-9]|[2-9]\d|[1-9]\d{2,})[RL]", # bpms > 14 L or R of IP } unknown_elements = [ty for ty in types if ty not in re_dict] if len(unknown_elements): raise TypeError("Unknown element(s): '{:s}'".format(str(unknown_elements))) series = pd.Series(list_of_elements) mask = series.str.match(re_dict[types[0]], case=False) for ty in types[1:]: mask = mask | series.str.match(re_dict[ty], case=False) return mask.values
class _LhcSegmentMixin(object): def __init__(self): self._start = None self._end = None def get_segment_vars(self, classes=None): return self.get_variables(frm=self.start.s, to=self.end.s, classes=classes) def verify_object(self): try: self.get_beam() except AttributeError: raise AcceleratorDefinitionError( "The accelerator definition is incomplete, beam " "has to be specified (--beam option missing?)." ) if self.optics_file is None: raise AcceleratorDefinitionError( "The accelerator definition is incomplete, optics " "file has not been specified." ) if self.xing is None: raise AcceleratorDefinitionError("Crossing on or off not set.") if self.label is None: raise AcceleratorDefinitionError("Segment label not set.") if self.start is None: raise AcceleratorDefinitionError("Segment start not set.") if self.end is None: raise AcceleratorDefinitionError("Segment end not set.") class _LhcB1Mixin(object): @classmethod def get_beam(cls): return 1 @classmethod def get_beam_direction(cls): return 1 class _LhcB2Mixin(object): @classmethod def get_beam(cls): return 2 @classmethod def get_beam_direction(cls): return -1
[docs]class LhcAts(Lhc): MACROS_NAME = "lhc_runII_ats"
# Specific accelerator definitions ###########################################
[docs]class LhcRunI(Lhc): YEAR = "2012" @classmethod def load_main_seq_madx(cls): load_main_seq = _get_call_main_for_year("2012") load_main_seq += _get_madx_call_command( os.path.join(LHC_DIR, "2012", "install_additional_elements.madx") ) return load_main_seq
[docs]class LhcRunII2015(Lhc): YEAR = "2015"
[docs]class LhcRunII2016(Lhc): YEAR = "2016"
[docs]class LhcRunII2016Ats(LhcAts, LhcRunII2016): pass
[docs]class LhcRunII2017(LhcAts): YEAR = "2017"
[docs]class LhcRunII2018(LhcAts): YEAR = "2018"
[docs]class HlLhc10(LhcAts): MACROS_NAME = "hllhc" YEAR = "hllhc1.0" @classmethod def load_main_seq_madx(cls): load_main_seq = _get_call_main_for_year("2015") load_main_seq += _get_call_main_for_year("hllhc1.0") return load_main_seq
[docs]class HlLhc12(LhcAts): MACROS_NAME = "hllhc" YEAR = "hllhc1.2" @classmethod def load_main_seq_madx(cls): load_main_seq = _get_call_main_for_year("2015") load_main_seq += _get_call_main_for_year("hllhc1.2") return load_main_seq @classmethod def _get_triplet_correctors_file(cls): correctors_dir = os.path.join(LHC_DIR, "hllhc1.2", "correctors") return os.path.join(correctors_dir, "triplet_correctors.json") @classmethod def _get_corrector_elems(cls): correctors_dir = os.path.join(LHC_DIR, "hllhc1.2", "correctors") return os.path.join(correctors_dir, "corrector_elems_b" + str(cls.get_beam()) + ".tfs")
[docs]class HlLhc12NewCircuit(LhcAts): MACROS_NAME = "hllhc" YEAR = "hllhc12"
[docs]class HlLhc12NoQ2Trim(HlLhc12): MACROS_NAME = "hllhc" YEAR = "hllhc12"
[docs]class HlLhc13(LhcAts): MACROS_NAME = "hllhc" YEAR = "hllhc1.3" @classmethod def load_main_seq_madx(cls): load_main_seq = _get_madx_call_command( os.path.join(LHC_DIR, "hllhc1.3", "lhcrunIII.seq") ) load_main_seq += _get_call_main_for_year("hllhc1.3") return load_main_seq @classmethod def _get_triplet_correctors_file(cls): correctors_dir = os.path.join(LHC_DIR, "hllhc1.3", "correctors") return os.path.join(correctors_dir, "triplet_correctors.json") @classmethod def _get_corrector_elems(cls): correctors_dir = os.path.join(LHC_DIR, "hllhc1.3", "correctors") return os.path.join(correctors_dir, "corrector_elems_b" + str(cls.get_beam()) + ".tfs")
# General functions ########################################################## def _get_call_main_for_year(year): call_main = _get_madx_call_command( _get_file_for_year(year, "main.seq") ) return call_main def _get_madx_call_command(path_to_call): command = "call, file = \"" command += path_to_call command += "\";\n" return command def _get_file_for_year(year, filename): return os.path.join(LHC_DIR, year, filename) def _merge_jsons(*files): full_dict = {} for json_file in files: with open(json_file, "r") as json_data: json_dict = json.load(json_data) for key in json_dict.keys(): full_dict[key] = json_dict[key] return full_dict def _flatten_list(my_list): return [item for sublist in my_list for item in sublist] def _remove_dups_keep_order(my_list): return list(OrderedDict.fromkeys(my_list)) def _list_intersect_keep_order(primary_list, secondary_list): return [elem for elem in primary_list if elem in secondary_list] # Script Mode ################################################################## if __name__ == '__main__': raise EnvironmentError("{:s} is not supposed to run as main.".format(__file__))