Source code for correction.getdiff

"""
:module: correction.getdiff

Created on 24/02/18

:author: Lukas Malina

Calculates the difference between GetLLM output and correction plugged in the model.
Provide as first argument the path to the output files of GetLLM.

model inputs:
    twiss_cor.dat and twiss_no.dat

outputs in measurement directory:
    phasex.out and phasey.out
    bbx.out and bby.out
    dx.out, dy.out and ndx.out
    couple.out and chromatic_coupling.out

TODOs and Notes:
    OpticsMeasurement: possibly extend and use with measurement filters from global correction
                        to be used in sbs, new corrections, getdiff, plot_export

    Expected values after correction to be put in, little tricky with phase column names
    No coupling in twiss_no.dat? not used

Some hints:
    MEA, MODEL, EXPECT are usually the names for the differences between the values and the model.
    Apart from phase, where these differences are called DIFF and DIFF_MDL (EXPECT is still the
    same) while MEA and MODEL are the actual measurement and model values respectively.

    Don't look into the coupling and chromatic coupling namings.
"""
from __future__ import print_function

import re
import sys
from os.path import abspath, join, dirname, isdir, exists, split, pardir

import numpy as np
import pandas as pd

new_path = abspath(join(dirname(abspath(__file__)), pardir))
if new_path not in sys.path:
    sys.path.append(new_path)

from optics_measurements.io_filehandler import OpticsMeasurement
from twiss_optics.optics_class import TwissOptics
from tfs_files.tfs_pandas import read_tfs, write_tfs
from utils import logging_tools, beta_star_from_twiss as bsft

LOG = logging_tools.get_logger(__name__)

TWISS_CORRECTED = "twiss_cor.dat"
TWISS_NOT_CORRECTED = "twiss_no.dat"
TWISS_CORRECTED_PLUS = "twiss_cor_dpp.dat"  # positive dpp
TWISS_CORRECTED_MINUS = "twiss_cor_dpm.dat"  # negative dpp


# Main invocation ############################################################


def get_diff_filename(id):
    return "diff_{:s}.out".format(id)


[docs]def getdiff(meas_path=None, beta_file_name="getbeta"): """ Calculates the differences between measurement, corrected and uncorrected model. After running madx and creating the model with (twiss_cor) and without (twiss_no) corrections, run this functions to get tfs-files with the differences between measurements and models. Args: meas_path (str): Path to the measurement folder. Needs to contain twiss_cor.dat and twiss_no.dat. """ if meas_path is None: meas_path = sys.argv[1] LOG.debug("Started 'getdiff' for measurment dir '{:s}'".format(meas_path)) if not isdir(meas_path): raise IOError("No valid measurement directory:" + meas_path) corrected_model_path = join(meas_path, TWISS_CORRECTED) uncorrected_model_path = join(meas_path, TWISS_NOT_CORRECTED) meas = OpticsMeasurement(meas_path) twiss_cor = read_tfs(corrected_model_path).set_index('NAME', drop=False) twiss_no = read_tfs(uncorrected_model_path).set_index('NAME', drop=False) coup_cor = TwissOptics(twiss_cor, quick_init=True).get_coupling(method='cmatrix') coup_no = TwissOptics(twiss_no, quick_init=True).get_coupling(method='cmatrix') model = pd.merge(twiss_cor, twiss_no, how='inner', on='NAME', suffixes=('_c', '_n')) coupling_model = pd.merge(coup_cor, coup_no, how='inner', left_index=True, right_index=True, suffixes=('_c', '_n')) coupling_model['NAME'] = coupling_model.index.values for plane in ['x', 'y']: _write_betabeat_diff_file(meas_path, meas, model, plane, beta_file_name) _write_phase_diff_file(meas_path, meas, model, plane) _write_disp_diff_file(meas_path, meas, model, plane) _write_closed_orbit_diff_file(meas_path, meas, model, plane) _write_coupling_diff_file(meas_path, meas, coupling_model) _write_norm_disp_diff_file(meas_path, meas, model) _write_chromatic_coupling_files(meas_path, corrected_model_path) _write_betastar_diff_file(meas_path, meas, twiss_cor, twiss_no) LOG.debug("Finished 'getdiff'.")
# Writing Functions ########################################################## def _write_betabeat_diff_file(meas_path, meas, model, plane, betafile): LOG.debug("Calculating beta diff.") if betafile == "getbeta": meas_beta = meas.beta[plane] elif betafile == "getampbeta": meas_beta = meas.amp_beta[plane] elif betafile == "getkmodbeta": meas_beta = meas.kmod_beta[plane] else: raise KeyError("Unknown beta file name '{}'.".format(betafile)) up = plane.upper() tw = pd.merge(meas_beta, model, how='inner', on='NAME') tw['MEA'] = ((tw.loc[:, 'BET' + up] - tw.loc[:, 'BET' + up + 'MDL']) / tw.loc[:, 'BET' + up + 'MDL']) tw['ERROR'] = tw.loc[:, 'ERRBET' + up] / tw.loc[:, 'BET' + up + 'MDL'] tw['MODEL'] = ((tw.loc[:, 'BET' + up + '_c'] - tw.loc[:, 'BET' + up + '_n']) / tw.loc[:, 'BET' + up + '_n']) tw['EXPECT'] = tw['MEA'] - tw['MODEL'] write_tfs(join(meas_path, get_diff_filename('bb' + plane)), tw.loc[:, ['NAME', 'S', 'MEA', 'ERROR', 'MODEL', 'EXPECT']]) def _write_phase_diff_file(meas_path, meas, model, plane): LOG.debug("Calculating phase diff.") up = plane.upper() tw = pd.merge(meas.phase[plane], model, how='inner', on='NAME') tw['MEA'] = tw.loc[:, 'PHASE' + up] tw['ERROR'] = tw.loc[:, 'STDPH' + up] tw['MODEL'] = np.concatenate((np.diff(tw.loc[:, 'MU' + up + '_c']), np.array([0.0]))) tw['DIFF'] = tw.loc[:, 'PHASE' + up] - tw.loc[:, 'PH' + up + 'MDL'] tw['DIFF_MDL'] = tw.loc[:, 'MODEL'] - tw.loc[:, 'PH' + up + 'MDL'] tw['EXPECT'] = tw['DIFF'] - tw['DIFF_MDL'] write_tfs(join(meas_path, get_diff_filename('phase' + plane)), tw.loc[tw.index[:-1], ['NAME', 'S', 'MEA', 'ERROR', 'MODEL', 'DIFF', 'DIFF_MDL', 'EXPECT']]) def _write_disp_diff_file(meas_path, meas, model, plane): LOG.debug("Calculating dispersion diff.") up = plane.upper() try: tw = pd.merge(meas.disp[plane], model, how='inner', on='NAME') except IOError: LOG.debug("Dispersion measurements not found. Skipped.") else: tw['MEA'] = tw.loc[:, 'D' + up] - tw.loc[:, 'D' + up + 'MDL'] tw['ERROR'] = tw.loc[:, 'STDD' + up] tw['MODEL'] = tw.loc[:, 'D' + up + '_c'] - tw.loc[:, 'D' + up + '_n'] tw['EXPECT'] = tw['MEA'] - tw['MODEL'] write_tfs(join(meas_path, get_diff_filename('d' + plane)), tw.loc[:, ['NAME', 'S', 'MEA', 'ERROR', 'MODEL', 'EXPECT']]) def _write_closed_orbit_diff_file(meas_path, meas, model, plane): LOG.debug("Calculating orbit diff.") up = plane.upper() try: tw = pd.merge(meas.orbit[plane], model, how='inner', on='NAME') except IOError: LOG.debug("Orbit measurements not found. Skipped.") else: tw['MEA'] = tw.loc[:, up] - tw.loc[:, up + 'MDL'] tw['ERROR'] = tw.loc[:, 'STD' + up] tw['MODEL'] = (tw.loc[:, up + '_c'] - tw.loc[:, up + '_n']) * 1000 tw['EXPECT'] = tw['MEA'] - tw['MODEL'] * 1000 write_tfs(join(meas_path, get_diff_filename('co' + plane)), tw.loc[:, ['NAME', 'S', 'MEA', 'ERROR', 'MODEL', 'EXPECT']]) def _write_norm_disp_diff_file(meas_path, meas, model): LOG.debug("Calculating normalized dispersion diff.") try: tw = pd.merge(meas.norm_disp, model, how='inner', on='NAME') except IOError: LOG.debug("Normalized dispersion measurements not found. Skipped.") else: tw['MEA'] = tw.loc[:, 'NDX'] - tw.loc[:, 'NDXMDL'] tw['ERROR'] = tw.loc[:, 'STDNDX'] tw['MODEL'] = (tw.loc[:, 'DX_c'] / np.sqrt(tw.loc[:, 'BETX_c']) - tw.loc[:, 'DX_n'] / np.sqrt(tw.loc[:, 'BETX_n'])) tw['EXPECT'] = tw['MEA'] - tw['MODEL'] write_tfs(join(meas_path, get_diff_filename('ndx')), tw.loc[:, ['NAME', 'S', 'MEA', 'ERROR', 'MODEL', 'EXPECT']]) def _write_coupling_diff_file(meas_path, meas, model): LOG.debug("Calculating coupling diff.") tw = pd.merge(meas.coupling, model, how='inner', on='NAME') out_columns = ['NAME', 'S'] for idx, rdt in enumerate(['F1001', 'F1010']): tw[rdt+'re'] = tw.loc[:, rdt+'R'] tw[rdt+'im'] = tw.loc[:, rdt+'I'] tw[rdt+'e'] = tw.loc[:, 'FWSTD{:d}'.format(idx+1)] tw[rdt+'re_m'] = np.real(tw.loc[:, rdt+'_c']) tw[rdt+'im_m'] = np.imag(tw.loc[:, rdt+'_c']) tw[rdt+'re_prediction'] = tw.loc[:, rdt+'re'] - tw.loc[:, rdt+'re_m'] tw[rdt+'im_prediction'] = tw.loc[:, rdt+'im'] - tw.loc[:, rdt+'im_m'] tw[rdt+'W_prediction'] = np.sqrt(np.square(tw[rdt+'re_prediction']) + np.square(tw[rdt+'im_prediction'])) out_columns += [rdt+'re', rdt+'im', rdt+'e', rdt+'re_m', rdt+'im_m', rdt+'W', rdt+'W_prediction', rdt+'re_prediction', rdt+'im_prediction'] tw['in_use'] = 1 out_columns += ['in_use'] write_tfs(join(meas_path, get_diff_filename('couple')), tw.loc[:, out_columns]) def _write_chromatic_coupling_files(meas_path, cor_path): LOG.debug("Calculating chromatic coupling diff.") # TODO: Add Cf1010 try: twiss_plus = read_tfs(join(split(cor_path)[0], TWISS_CORRECTED_PLUS), index='NAME') twiss_min = read_tfs(join(split(cor_path)[0], TWISS_CORRECTED_MINUS), index='NAME') except IOError: LOG.debug("Chromatic coupling measurements not found. Skipped.") else: deltap = np.abs(twiss_plus.DELTAP - twiss_min.DELTAP) plus = TwissOptics(twiss_plus, quick_init=True).get_coupling(method='cmatrix') minus = TwissOptics(twiss_min, quick_init=True).get_coupling(method='cmatrix') model = pd.merge(plus, minus, how='inner', left_index=True, right_index=True, suffixes=('_p', '_m')) model['NAME'] = model.index.values if exists(join(meas_path, "chromcoupling_free.out")): meas = read_tfs(join(meas_path, "chromcoupling_free.out")) else: meas = read_tfs(join(meas_path, "chromcoupling.out")) tw = pd.merge(meas, model, how='inner', on='NAME') cf1001 = (tw.loc[:, 'F1001_p'] - tw.loc[:, 'F1001_m']) / deltap tw['Cf1001r_model'] = np.real(cf1001) tw['Cf1001i_model'] = np.imag(cf1001) tw['Cf1001r_prediction'] = tw.loc[:, 'Cf1001r'] - tw.loc[:, 'Cf1001r_model'] tw['Cf1001i_prediction'] = tw.loc[:, 'Cf1001i'] - tw.loc[:, 'Cf1001i_model'] write_tfs(join(meas_path, get_diff_filename('chromatic_coupling')), tw.loc[:, ['NAME', 'S', 'Cf1001r', 'Cf1001rERR', 'Cf1001i', 'Cf1001iERR', 'Cf1001r_model', 'Cf1001i_model', 'Cf1001r_prediction', 'Cf1001i_prediction']]) def _write_betastar_diff_file(meas_path, meas, twiss_cor, twiss_no): LOG.debug("Calculating betastar diff at the IPs.") try: meas = meas.kmod_betastar.set_index(bsft.RES_COLUMNS[0]) except IOError: LOG.debug("Beta* measurements not found. Skipped.") else: # get all IPs ip_map = {} beam = '' for label in meas.index.values: ip, beam = re.findall(r'\d', label)[-2:] # beam should be the same for all if ip not in "1258": raise NotImplementedError( "Beta-Star comparison is not yet implemented for measurements in IP" + ip) ip_label = "IP" + ip ip_map[label] = ip_label beam = int(beam) all_ips = set(ip_map.values()) try: # calculate waist and so on model = bsft.get_beta_star_and_waist_from_ip(twiss_cor, beam, all_ips) design = bsft.get_beta_star_and_waist_from_ip(twiss_no, beam, all_ips) except KeyError: LOG.warn("Can't find all IPs in twiss files. Skipped beta* calculations.") else: # extract data tw = pd.DataFrame() for label in meas.index: plane = label[-1] ip_name = bsft.get_full_label(ip_map[label], beam, plane) tw.loc[label, "S"] = model.loc[ip_name, "S"] # calculate alpha* but with s-oriented waist definition meas["ALPHASTAR"] = meas["WAIST"] / meas["BETAWAIST"] meas["ALPHASTAR_ERR"] = ((meas["WAIST_ERR"] / meas["WAIST"] + meas["BETAWAIST_ERR"] / meas["BETAWAIST"]) * meas["ALPHASTAR"] ) for attr in bsft.RES_COLUMNS[2:]: # default diff parameter tw.loc[label, attr + "_MEA"] = (meas.loc[label, attr] - design.loc[ip_name, attr]) tw.loc[label, attr + "_ERROR"] = meas.loc[label, attr + "_ERR"] tw.loc[label, attr + "_MODEL"] = (model.loc[ip_name, attr] - design.loc[ip_name, attr]) # additional for checks (e.g. for betastar* panel) tw.loc[label, attr + "_MEAVAL"] = meas.loc[label, attr] tw.loc[label, attr + "_DESIGNVAL"] = design.loc[ip_name, attr] tw.loc[label, attr + "_MODELVAL"] = model.loc[ip_name, attr] # and the beatings tw.loc[label, "B{}_MEA".format(attr)] = (tw.loc[label, attr + "_MEA"] / design.loc[ip_name, attr]) tw.loc[label, "B{}_MODEL".format(attr)] = (tw.loc[label, attr + "_MODEL"] / design.loc[ip_name, attr]) # special handling for the expectation values, as waist and betawaist # should be derived directly from alpha* and beta* if attr in bsft.RES_COLUMNS[2:4]: # beta* and alpha*: as usual tw.loc[label, attr + "_EXPECT"] = (tw.loc[label, attr + "_MEA"] - tw.loc[label, attr + "_MODEL"]) tw.loc[label, attr + "_EXPECTVAL"] = (design.loc[ip_name, attr] + tw.loc[label, attr + "_EXPECT"]) tw.loc[label, "B{}_EXPECT".format(attr)] = ( tw.loc[label, "B{}_MEA".format(attr)] - tw.loc[label, "B{}_MODEL".format(attr)]) else: # waist and betawaist: calculate expected value directly and go from there tw.loc[label, attr + "_EXPECTVAL"] = ( bsft.get_waist_wrapper(attr, tw.loc[label, "BETASTAR_EXPECTVAL"], tw.loc[label, "ALPHASTAR_EXPECTVAL"], ) ) tw.loc[label, attr + "_EXPECT"] = ( tw.loc[label, attr + "_EXPECTVAL"] - design.loc[ip_name, attr]) tw.loc[label, "B{}_EXPECT".format(attr)] = ( tw.loc[label, attr + "_EXPECTVAL"] / design.loc[ip_name, attr]) write_tfs(join(meas_path, get_diff_filename('betastar')), tw, save_index=bsft.RES_COLUMNS[0]) # Script Mode ################################################################ if __name__ == "__main__": getdiff()