Source code for harmonic_analysis.io_handlers.input_handler

import sys
import os
from os.path import abspath, dirname
import argparse
from optics_measurements.optics_input import _get_optics_parser, OpticsInput
import logging
from model import manager

LOGGER = logging.getLogger(__name__)


def parse_args(args=None):
    main_parser = _get_main_parser()
    main_options, rest = main_parser.parse_known_args(args)
    if "optics" in rest:
        rest.append('--files={}'.format(main_options.file))
        rest.append('--outputdir={}'.format(main_options.outputdir))
        rest.append('--model_dir={}'.format(dirname(abspath(main_options.model))))
    subparsers = {
        "clean": _get_clean_parser,
        "harpy": _get_harpy_parser,
        "optics": _get_optics_parser,
    }
    suboptions = {}
    for subparser_name in subparsers.keys():
        if subparser_name in rest:
            rest.remove(subparser_name)
            suboption, rest = subparsers[subparser_name]().parse_known_args(rest)
            suboptions[subparser_name] = suboption
    main_input = MainInput.init_from_options(main_options)
    clean_input = None
    harpy_input = None
    optics_input = None

    if "clean" in suboptions:
        clean_input = CleanInput.init_from_options(suboptions["clean"])

    if "harpy" in suboptions:
        harpy_input = HarpyInput.init_from_options(suboptions["harpy"])

    if "optics" in suboptions:
        accelerator = manager.get_accel_instance(rest)
        suboptions["optics"].accelerator = accelerator
        optics_input = OpticsInput.init_from_options(suboptions["optics"])



    python_path = "/afs/cern.ch/work/o/omc/anaconda/bin/python "
    return main_input, clean_input, harpy_input, optics_input, (python_path + " ".join(sys.argv))


[docs]class ArgumentError(Exception): pass
class MainInput(object): DEFAULTS = { "write_raw": False, "startturn": 0, "endturn": 50000, "skip_files": False, } def __init__(self): self.file = None self.model = None self.outputdir = None self.write_raw = MainInput.DEFAULTS["write_raw"] self.startturn = MainInput.DEFAULTS["startturn"] self.endturn = MainInput.DEFAULTS["endturn"] self.skip_files = MainInput.DEFAULTS["skip_files"] @staticmethod def init_from_options(options): self = MainInput() self.file = options.file#[file_name.strip() for file_name in options.file.strip("\"").split(",")] self.model = options.model self.outputdir = options.outputdir self.write_raw = options.write_raw self.skip_files = options.skip_files if self.outputdir is None: outdir = os.path.dirname(self.file) self.outputdir = outdir self.startturn = options.startturn self.endturn = options.endturn return self def _get_main_parser(): parser = argparse.ArgumentParser() # Obligatory arguments ########### parser.add_argument( "--file", "--files", help="Comma separated binary files to clean", required=True, dest="file" ) parser.add_argument( "--model", help="Model for BPM locations", required=True, dest="model" ) ################################ # Optional arguments ########### parser.add_argument( "--outputdir", help="""Output directory. If not present it will use the input file directory.""", dest="outputdir" ) parser.add_argument( "--write_raw", help=("If present, it will write the raw" + "ASCII file in the --outputdir."), action="store_true", ) parser.add_argument( "--skip_files", help=("If present, it will not write the .amps and the .freqs files" + "ASCII file in the --outputdir."), action="store_true", ) parser.add_argument( "--startturn", help="Turn index to start. Default is first turn: %(default)s", default=MainInput.DEFAULTS["startturn"], dest="startturn", type=int ) parser.add_argument( "--endturn", help="""First turn index to be ignored. Default is a number that is lower than the maximum which can be handled by drive: %(default)s""", default=MainInput.DEFAULTS["endturn"], dest="endturn", type=int ) return parser ################################ class CleanInput(object): DEFAULTS = { "svd_mode": "numpy", "sing_val": 12, "peak_to_peak": 0.00001, "max_peak": 20., "single_svd_bpm_threshold": 0.925, "bad_bpms": [], "wrong_polarity_bpms": [], "noresync": False, "write_clean": False, "no_exact_zeros": False, } def __init__(self): self.svd_mode = CleanInput.DEFAULTS["svd_mode"] self.sing_val = CleanInput.DEFAULTS["sing_val"] self.peak_to_peak = CleanInput.DEFAULTS["peak_to_peak"] self.max_peak = CleanInput.DEFAULTS["max_peak"] self.single_svd_bpm_threshold = CleanInput.DEFAULTS[ "single_svd_bpm_threshold" ] self.bad_bpms = CleanInput.DEFAULTS["bad_bpms"] self.wrong_polarity_bpms = CleanInput.DEFAULTS["wrong_polarity_bpms"] self.noresync = CleanInput.DEFAULTS["noresync"] self.write_clean = CleanInput.DEFAULTS["write_clean"] self.no_exact_zeros = CleanInput.DEFAULTS["no_exact_zeros"] @staticmethod def init_from_options(options): self = CleanInput() self.svd_mode = options.svd_mode self.sing_val = options.sing_val self.peak_to_peak = options.peak_to_peak self.max_peak = options.max_peak self.single_svd_bpm_threshold = options.single_svd_bpm_threshold if not options.bad_bpms: self.bad_bpms = [] else: self.bad_bpms = [bad_bpm.strip() for bad_bpm in options.bad_bpms.strip("\"").split(",")] if not options.wrong_polarity_bpms: self.wrong_polarity_bpms = [] else: self.wrong_polarity_bpms = [wrong_polarity_bpm.strip() for wrong_polarity_bpm in options.wrong_polarity_bpms.strip("\"").split(",")] self.noresync = options.noresync self.write_clean = options.write_clean self.no_exact_zeros = options.no_exact_zeros return self def _get_clean_parser(): parser = argparse.ArgumentParser() # Optional arguments ########### parser.add_argument( "--svd_mode", help="""Defines modes of the SVD funtion itself. Should be one of: - numpy: numpy.linalg.svd - sparse: scipy.sparse.linalg.svds - random: Randomized SVD """, default=CleanInput.DEFAULTS["svd_mode"], dest="svd_mode", type=str, choices=("numpy", "sparse", "random"), ) parser.add_argument( "--sing_val", help="""Keep this amount of singular values in decreasing order, rest will be cut (set to 0). Default is a large number: %(default)s""", default=CleanInput.DEFAULTS["sing_val"], dest="sing_val", type=int ) parser.add_argument( "--pk-2-pk", help="""Peak to peak amplitude cut. This removes BPMs where abs(max(turn values) - min(turn values)) <= threshold. Default: %(default)s""", default=CleanInput.DEFAULTS["peak_to_peak"], dest="peak_to_peak", type=float) parser.add_argument( "--max-peak-cut", help="""Maximum peak tolerance in mm. This removes BPMs where the maximum measured oscillation > threshold. Default: %(default)s""", default=CleanInput.DEFAULTS["max_peak"], dest="max_peak", type=float) parser.add_argument( "--single_svd_bpm_threshold", help="""Threshold for single BPM dominating a mode. Should be > 0.9 for LHC. Default: %(default)s""", default=CleanInput.DEFAULTS["single_svd_bpm_threshold"], dest="single_svd_bpm_threshold", type=float) parser.add_argument( "--bad_bpms", help=""""Comma separated bad BPMs to clean. Default: %(default)s""", default=CleanInput.DEFAULTS["bad_bpms"], dest="bad_bpms", type=str) parser.add_argument( "--wrong_polarity_bpms", help=""""Comma separated BPMs with swapped polarity in both planes. Default: %(default)s""", default=CleanInput.DEFAULTS["wrong_polarity_bpms"], dest="wrong_polarity_bpms", type=str) parser.add_argument( "--noresync", help="""Ignore the synchronization error of the BPMs.""", dest="noresync", action="store_true" ) parser.add_argument( "--write_clean", help="""If present, it will write the cleaned ASCII file in the --outputdir.""", dest="write_clean", action="store_true" ) parser.add_argument( "--no_exact_zeros", help="""If present, will not remove files with a single zero .""", dest="no_exact_zeros", action="store_true" ) ################################ return parser class HarpyInput(object): DEFAULTS = { "tunez": 0.0, "tolerance": 0.01, "harpy_mode": "svd", "sequential": False, "no_tune_clean": False, "tune_clean_limit": 1e-5, "is_free_kick": False, } def __init__(self): self.tunex = None self.tuney = None self.tunez = HarpyInput.DEFAULTS["tunez"] self.nattunex = None self.nattuney = None self.nattunez = None self.tolerance = HarpyInput.DEFAULTS["tolerance"] self.harpy_mode = HarpyInput.DEFAULTS["harpy_mode"] self.sequential = HarpyInput.DEFAULTS["sequential"] self.no_tune_clean = HarpyInput.DEFAULTS["no_tune_clean"] self.tune_clean_limit = HarpyInput.DEFAULTS["tune_clean_limit"] self.is_free_kick = HarpyInput.DEFAULTS["is_free_kick"] @staticmethod def init_from_options(options): def check_tune_overlap(tune, nattune, plane, tolerance): if abs(tune - nattune) < 2 * tolerance: msg = ("Windows for tune and natural tune in plane {} overlap " "for tolerance {}. This could lead to both tunes being " "found at the same frequency").format(plane, tolerance) LOGGER.warning(msg) self = HarpyInput() self.tunex = options.tunex self.tuney = options.tuney self.nattunex = options.nattunex self.nattuney = options.nattuney self.tunez = options.tunez self.nattunez = options.nattunez self.tolerance = options.tolerance self.harpy_mode = options.harpy_mode self.sequential = options.sequential self.no_tune_clean = options.no_tune_clean self.tune_clean_limit = options.tune_clean_limit self.is_free_kick = options.is_free_kick # Check there's no overlap for the tune and nattune check_tune_overlap(self.tunex, self.nattunex, 'x', self.tolerance) check_tune_overlap(self.tuney, self.nattuney, 'y', self.tolerance) return self def _get_harpy_parser(): parser = argparse.ArgumentParser() # Obligatory arguments ########### parser.add_argument( "--tunex", help="Guess for the main horizontal tune.", dest="tunex", type=float, required=True, ) parser.add_argument( "--tuney", help="Guess for the main vertical tune.", dest="tuney", type=float, required=True, ) ################################ # Optional arguments ########### parser.add_argument( "--nattunex", help="Guess for the natural horizontal tune.", dest="nattunex", type=float, ) parser.add_argument( "--nattuney", help="Guess for the natural vertical tune.", dest="nattuney", type=float, ) parser.add_argument( "--tunez", help="Guess for the main synchrotron tune.", default=HarpyInput.DEFAULTS["tunez"], dest="tunez", type=float, ) parser.add_argument( "--nattunez", help="Guess for the natural synchrotron tune.", default=None, dest="nattunez", type=float, ) parser.add_argument( "--tolerance", help="Tolerance on the guess for the tunes.", default=HarpyInput.DEFAULTS["tolerance"], dest="tolerance", type=float, ) parser.add_argument( "--harpy_mode", help="Harpy resonance computation mode.", dest="harpy_mode", type=str, choices=("bpm", "svd", "fast", "window"), default=HarpyInput.DEFAULTS["harpy_mode"], ) parser.add_argument( "--sequential", help="If set, it will run in only one process.", dest="sequential", action="store_true", ) parser.add_argument( "--no_tune_clean", help="If present, the automatic tune cleaning will be deactivated.", dest="no_tune_clean", action="store_true", ) parser.add_argument( "--tune_clean_limit", help="The autoclean wont remove tune deviation lower than this limit.", dest="tune_clean_limit", type=float, default=HarpyInput.DEFAULTS["tune_clean_limit"] ) parser.add_argument( "--free_kick", help="If present, it will perform the free kick phase correction", dest="is_free_kick", action="store_true", ) ################################ return parser