Source code for omc3.tune_analysis.fitting_tools

"""
Fitting Tools
-------------

This module contains fitting functionality for ``tune_analysis``.
It provides tools for fitting functions, mainly via odr.

"""
from __future__ import annotations

from typing import TYPE_CHECKING

import numpy as np
from numpy.polynomial import Polynomial
from scipy.odr import ODR, Model, RealData
from scipy.optimize import curve_fit

from omc3.tune_analysis.constants import FakeOdrOutput
from omc3.utils import logging_tools

if TYPE_CHECKING:
    from collections.abc import Sequence

    import pandas as pd
    from numpy.typing import ArrayLike

LOG = logging_tools.get_logger(__name__)


# ODR ###################################################################


[docs] def get_poly_fun(order: int): """Returns the function of polynomial order. (is this pythonic enough?).""" def poly_func(beta, x): return sum(beta[i] * np.power(x, i) for i in range(order+1)) return poly_func
[docs] def do_odr(x: pd.Series, y: pd.Series, xerr: pd.Series, yerr: pd.Series, order: int): """ Returns the odr fit. Important Convention: The beta-parameter in the ODR models go upwards with order, i.e. | beta[0] = y-Axis offset | beta[1] = slope | beta[2] = quadratic term | etc. Args: x: `Series` of x data. y: `Series` of y data. xerr: `Series` of x data errors. yerr: `Series` of y data errors. order: fit order, ``1`` for linear, ``2`` for quadratic. Returns: Odr fit. Betas order is index = coefficient of same order. """ LOG.debug("Starting ODR fit.") # Poly-Fit for starting point --- fit_np = Polynomial.fit(x, y, deg=order).convert() LOG.debug(f"ODR fit input (from polynomial fit): {fit_np}") # Actual ODR --- xerr, yerr = _check_exact_zero_errors(xerr, yerr) odr = ODR(data=RealData(x=x, y=y, sx=xerr, sy=yerr), model=Model(get_poly_fun(order)), beta0=fit_np.coef) odr_fit = odr.run() logging_tools.odr_pprint(LOG.info, odr_fit) return odr_fit
# 2D-Kick ODR ################################################################## INPUT_ORDER = "qx0", "qy0", "dqx/dex", "dqy/dey", "dq(x,y)/de(y,x)"
[docs] def first_order_detuning_2d(beta: Sequence, x: ArrayLike) -> ArrayLike: """ Calculates the 2D tune array (Qx, Qy) Qx = qx0 + dqx/dex * ex + dqx/dey * ey Qy = qy0 + dqy/dex * ex + dqy/dey * ey Args: beta: length 5 tune coefficients in order `INPUT_ORDER` 0: qx0, 1: qy0, 2: xx, 3: yy, 4: xy/yx x: array size 2xN, [[ex1, ex2, ...],[ey1, ey2,...]] Returns: np.array: 2xN [[Qx1, Qx2, ...],[Qy1, Qy2, ...]] """ return np.array([beta[0] + beta[2] * x[0] + beta[4] * x[1], beta[1] + beta[4] * x[0] + beta[3] * x[1]])
[docs] def first_order_detuning_2d_jacobian(beta: Sequence, x: ArrayLike) -> ArrayLike: """ Jacobian of the 2D tune array: [[dqx/dex, dqx/dey ], [dqy/dex, dqy/dey ]] Args: beta: length 5 tune coefficients in order `INPUT_ORDER` 0: qx0, 1: qy0, 2: xx, 3: yy, 4: xy/yx x: length 2 Sequence, (ex, ey) Returns: np.array: size 2x2xlen(x) containing the Jacobian of the 2d-fit function. """ return np.dstack([np.array([[beta[2], beta[4]], [beta[4], beta[3]]])] * len(x[0]))
[docs] def map_odr_fit_to_planes(odr_fit) -> dict[str, dict[str, FakeOdrOutput]]: """ Maps the calculated odr fit to fake odr fits with `beta` and `sd_beta` attributes. These would be the results of first-order amplitude detuning odr-fits when done independently by tune and kick plane. Returns: Dict[str, Dict[str: odr_fit]] of ODR fits, where the inner string gives the kick-plane, the outer the tune-plane.. """ def get_fit(a: int, b: int): return FakeOdrOutput( beta=[odr_fit.beta[a], odr_fit.beta[b]], sd_beta=[odr_fit.sd_beta[a], odr_fit.sd_beta[b]] ) return { "X": { "X": get_fit(0, 2), "Y": get_fit(0, 4), }, "Y": { "X": get_fit(1, 4), "Y": get_fit(1, 3), }, }
[docs] def do_2d_kicks_odr(x: ArrayLike, y: ArrayLike, xerr: ArrayLike, yerr: ArrayLike): """ Returns the odr fit. Args: x: `Array` of x data (2xN). y: `Array` of y data (2xN). xerr: `Array` of x data errors (2xN). yerr: `Array` of y data errors (2xN). Returns: Dict[str, Dict[str: odr_fit]] of Odr fits, where the inner string gives the kick-plane, the outer the tune-plane.. """ LOG.debug("Starting ODR fit.") x, y, xerr, yerr = _filter_nans(x, y, xerr, yerr) # Curve-Fit for starting point --- def curve_fit_fun(v, *args): return first_order_detuning_2d(args, v).ravel() beta, beta_cov = curve_fit(f=curve_fit_fun, xdata=x, ydata=y.ravel(), p0=[0]*5) res_str = ",\n".join([f"{n:>16} = {b:9.3g}" for n, b in zip(INPUT_ORDER, beta)]) LOG.info(f"\nDetuning estimate without errors (curve fit):\n{res_str}\n") # Actual ODR --- xerr, yerr = _check_exact_zero_errors(xerr, yerr) odr = ODR(data=RealData(x=x, y=y, sx=xerr, sy=yerr), # model=Model(first_order_detuning_2d), model=Model(first_order_detuning_2d, fjacd=first_order_detuning_2d_jacobian), beta0=beta) odr_fit = odr.run() logging_tools.odr_pprint(LOG.debug, odr_fit) res_str = ",\n".join([f"{n:>16} = {b:9.3g} +- {e:8.3g}" for n, b, e in zip(INPUT_ORDER, odr_fit.beta, odr_fit.sd_beta)]) LOG.info(f"\nDetuning estimate with errors (odr):\n{res_str}\n") return map_odr_fit_to_planes(odr_fit)
def _filter_nans(*args: ArrayLike) -> list[ArrayLike]: """Remove all data points containing a NaN. Assumes input arrays are all of shape 2xN. TODO: As this is not done in plotting, points might be plotted, that have not been used for fitting. """ a = np.array(args) a = a[:, :, ~np.isnan(a).any(axis=0).any(axis=0)] return list(a) def _check_exact_zero_errors(xerr: ArrayLike, yerr: ArrayLike) -> tuple[np.ndarray, np.ndarray]: """Check if errors are exact zero and replace with minimum error, if so. Done because ODR crashes on exact zero error-bars. Beware that the output will always be np.arrays, even if the input is pd.Series. """ def check_exact_zero_per_plane(err, plane): if (err != 0).all(): # no problem return err # best way to work with array and series? minval = np.where(err == 0, np.inf, err).min() # assumes all values >=0 if np.isinf(minval): raise ValueError(f"All errors are exactly zero in the {plane} plane." f" ODR cannot be performed.") LOG.warning(f"Exact zeros in {plane} errors found." f" Replaced by {minval} (the minimum value) to be able to perform ODR.") return np.where(err == 0, minval, err) xerr = check_exact_zero_per_plane(xerr, "horizontal") yerr = check_exact_zero_per_plane(yerr, "vertical") return xerr, yerr