"""
This module is the actual implementation of the resonance search for
turn-by-turn data.
It uses a combination of the laskar method with SVD decomposition to speed up
the search of resonances.
"""
from __future__ import print_function
import multiprocessing
import logging
from functools import partial
from collections import OrderedDict
import numpy as np
import pandas as pd
from utils import outliers
try:
    from scipy.fftpack import fft as _fft
except ImportError:
    from numpy.fft import fft as _fft
NUMBA_AVAILABLE = True
try:
    from numba import jit
except ImportError:
    NUMBA_AVAILABLE = False
LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.NullHandler())  # avoid 'no handler' warn from _which_compute_coef() on import
PI2I = 2 * np.pi * complex(0, 1)
RESONANCE_LISTS = {
    "X": ((0, 1, 0), (-2, 0, 0), (0, 2, 0), (-3, 0, 0), (-1, -1, 0),
          (2, -2, 0), (0, -2, 0), (1, -2, 0), (-1, 3, 0), (1, 2, 0),
          (-2, 1, 0), (1, 1, 0), (2, 0, 0), (-1, -2, 0), (3, 0, 0)),
    "Y": ((1, 0, 0), (-1, 1, 0), (-2, 0, 0), (1, -1, 0), (0, -2, 0),
          (0, -3, 0), (2, 1, 0), (-1, 3, 0), (1, 1, 0), (-1, 2, 0)),
    "Z": ((1, 0, 1), (0, 1, 1))
}
MAIN_LINES = {"X": (1, 0, 0), "Y": (0, 1, 0), "Z": (0, 0, 1)}
Z_TOLERANCE = 0.0001
NUM_HARMS = 300
NUM_HARMS_SVD = 100
PROCESSES = multiprocessing.cpu_count()
[docs]def harpy(harpy_input, bpm_matrix_x, usv_x, bpm_matrix_y, usv_y):
    """
    """
    all_frequencies = {}
    all_coefficients = {}
    spectr = {}
    bad_bpms_summaries = {}
    pandas_dfs = {}
    cases = (("X", bpm_matrix_x, usv_x),
             ("Y", bpm_matrix_y, usv_y))
    for plane, bpm_matrix, usv in cases:
        panda = pd.DataFrame(index=bpm_matrix.index, columns=OrderedDict())
        if harpy_input.harpy_mode == "window":
            frequencies, coefficients = windowed_padded_fft(bpm_matrix, usv, 21, harpy_input)
        else:
            frequencies, coefficients = harmonic_analysis(bpm_matrix, usv=usv,
                                                          mode=harpy_input.harpy_mode,
                                                          sequential=harpy_input.sequential, )
        spectr[plane] = _get_bpms_spectr(bpm_matrix,
                                         coefficients,
                                         frequencies)
        panda, not_tune_bpms = _get_main_resonances(
            harpy_input, frequencies, coefficients,
            plane, harpy_input.tolerance, panda
        )
        cleaned_by_tune_bpms = []
        if not harpy_input.no_tune_clean:
            cleaned_by_tune_bpms = clean_by_tune(panda.loc[:, 'TUNE' + plane],
                                                 harpy_input.tune_clean_limit)
        panda = panda.loc[panda.index.difference(cleaned_by_tune_bpms)]
        bpm_matrix = bpm_matrix.loc[panda.index]
        coefficients = coefficients.loc[panda.index]
        frequencies = frequencies.loc[panda.index]
        bad_bpms_summaries[plane] = _get_bad_bpms_summary(
            not_tune_bpms,
            cleaned_by_tune_bpms
        )
        panda = _amp_and_mu_from_avg(bpm_matrix, plane, panda)
        panda = _get_natural_tunes(frequencies, coefficients, harpy_input, plane, panda)
        if harpy_input.tunez > 0.0:
            panda, _ = _get_main_resonances(
                harpy_input, frequencies, coefficients,
                "Z", Z_TOLERANCE, panda
            )
        pandas_dfs[plane] = panda
        all_frequencies[plane] = frequencies
        all_coefficients[plane] = coefficients
    tunez = 0.0
    if harpy_input.tunez > 0.0:
        tunez = pandas_dfs["X"]["TUNEZ"].mean()
    tunes = (pandas_dfs["X"]["TUNEX"].mean(),
             pandas_dfs["Y"]["TUNEY"].mean(),
             tunez)
    nattunes = (pandas_dfs["X"]["NATTUNEX"].mean(),
                pandas_dfs["Y"]["NATTUNEY"].mean())
    # Log as an error in tune == nattune
    if tunes[0] == nattunes[0]:
        LOGGER.error('Tune and nattune for plane X are equal: {}'.format(tunes[0]))
    if tunes[1] == nattunes[1]:
        LOGGER.error('Tune and nattune for plane Y are equal: {}'.format(tunes[1]))
    n_turns = bpm_matrix_x.shape[1]
    for plane in ("X", "Y"):
        resonances_freqs = _compute_resonances_freqs(plane, tunes)
        if harpy_input.tunez > 0.0:
            resonances_freqs.update(_compute_resonances_freqs("Z", tunes))
        pandas_dfs[plane] = _resonance_search(all_frequencies[plane],
                                              all_coefficients[plane],
                                              resonances_freqs,
                                              pandas_dfs[plane],
                                              n_turns)
        yield pandas_dfs[plane], spectr[plane], bad_bpms_summaries[plane] 
def _get_bpms_spectr(bpm_matrix, coefficients, frequencies):
    all_bpms_coefs = pd.DataFrame(data=coefficients,
                                  index=bpm_matrix.index)
    all_bpms_freqs = pd.DataFrame(data=frequencies,
                                  index=bpm_matrix.index)
    all_bpms_spectr = {"COEFS": all_bpms_coefs, "FREQS": all_bpms_freqs}
    return all_bpms_spectr
[docs]def harmonic_analysis(bpm_matrix=None, usv=None, mode="bpm", sequential=False):
    """
    Performs the laskar method on every of the BPMs signals contained in each
    row of the bpm_matrix pandas DataFame.
    Args:
        bpm_matrix: Pandas DataFrame containing the signals of each bpm in
                    each row. In bpm mode this parameter is obligatory.
        usv: Truncated SVD decomposition of the bpm matrix. It must contain a
             tuple (U, S, V), where U must be a DataFrame with the bpm names as index.
        mode: one of 'bpm', 'svd', or 'fast'. Check 'harmonic_analysis_bpm'
              documentation for 'bpm' mode and 'harmonic_analysis_svd" for 'svd' and 'fast'.
        sequential: If true, it will run all the computations in a single core.
    Returns:
        frequencies: A numpy array with the frequencies found per BPM.
        bpm_coefficients: A numpy array containing the complex coefficients found per BPM.
    """
    if mode == "bpm":
        if bpm_matrix is None:
            raise ValueError("bpm_matrix has to be provided "
                             "for the bpm mode")
        frequencies, bpm_coefficients = harmonic_analysis_bpm(
            bpm_matrix,
            sequential=sequential,
            num_harms=NUM_HARMS,
        )
    elif mode in ("svd", "fast"):
        if usv is None:
            raise ValueError("SVD decomposition has to be provided "
                             "for fast or svd modes")
        num_harms = NUM_HARMS if mode == "fast" else NUM_HARMS_SVD
        frequencies, bpm_coefficients = harmonic_analysis_svd(
            usv,
            fast=mode == "fast",
            sequential=sequential,
            num_harms=num_harms,
        )
    else:
        raise ValueError("Invalid harpy mode: {}".format(mode))
    return frequencies, bpm_coefficients 
[docs]def harmonic_analysis_bpm(bpm_matrix,
                          sequential=False, num_harms=NUM_HARMS):
    """
    Performs the laskar method on every of the BPMs signals contained in each
    row of the bpm_matrix pandas DataFame. This method will run the full
    laskar analysis in each BPM in parallel (slow).
    Args:
        bpm_matrix: Pandas DataFrame containing the signals of each bpm in
                    each row. In bpm mode this parameter is obligatory.
        sequential: If true, it will run all the computations in a single core.
        num_harms: Number of harmonics to compute per BPM.
    Returns:
        frequencies: A numpy array with the frequencies found per BPM.
        bpm_coefficients: A numpy array containing the complex coefficients found per BPM.
    """
    frequencies, bpm_coefficients = _parallel_laskar(
        bpm_matrix, sequential, num_harms,
    )
    return frequencies, bpm_coefficients 
[docs]def harmonic_analysis_svd(usv, fast=False,
                          sequential=False, num_harms=NUM_HARMS):
    """
    Performs the laskar method on every of the BPMs signals contained in each
    row of the bpm_matrix pandas DataFame. It takes advantage of the
    truncation of the V matrix in the SVD decomposition to speed up the
    calculations. Mode 'svd' is slower but more precise, 'bpm' mode is the
    fastest but makes more assumptions.
    Args:
        usv: Truncated SVD decomposition of the bpm matrix. It must contain a
             tuple (U, S, V), where U must be a DataFrame with the bpm names
             as index.
        fast: If true, it will use the 'fast' calculation mode ('svd'
              otherwise)
        sequential: If true, it will run all the computations in a single core.
        num_harms: Number of harmonics to compute per BPM.
    Returns:
        frequencies: A numpy array with the frequencies found per BPM.
        bpm_coefficients: A numpy array containing the complex coefficients found per BPM.
    """
    sv = np.dot(np.diag(usv[1]), usv[2])
    fake_bpms = ["MODE{}".format(i) for i in range(sv.shape[0])]
    sv = pd.DataFrame(index=fake_bpms, data=sv)
    if fast:
        frequencies, _ = _laskar_per_mode(np.mean(sv, axis=0), num_harms)
    else:
        frequencies, _ = _parallel_laskar(
            sv, sequential, num_harms,
        )
        frequencies = np.ravel(frequencies)
    svd_coefficients = _compute_coefs_for_freqs(sv, frequencies)
    bpm_coefficients = usv[0].dot(svd_coefficients.values)
    frequencies = pd.DataFrame(
        index=bpm_coefficients.index,
        data=np.outer(np.ones(bpm_coefficients.shape[0]), frequencies)
    )
    return frequencies, bpm_coefficients 
[docs]def clean_by_tune(tunes, tune_clean_limit):
    """
    This function looks for outliers in the tunes pandas Series and returns
    their indices.
    Args:
        tunes: Pandas series with the tunes per BPM and the BPM names as
            index.
        tune_clean_limit: No BPM will find as oulier if its distance to the
            average is lower than this limit.
    """
    bad_bpms_mask = outliers.get_filter_mask(
        tunes,
        limit=tune_clean_limit,
    )
    bad_bpms_names = tunes[~bad_bpms_mask].index
    return bad_bpms_names 
def _get_bad_bpms_summary(not_tune_bpms, cleaned_by_tune_bpms):
    bad_bpms_summary = []
    for bpm_name in not_tune_bpms:
        bad_bpms_summary.append(bpm_name +
                                " The main resonance has not been found.")
    for bpm_name in cleaned_by_tune_bpms:
        bad_bpms_summary.append(
            "{} tune is too far from average"
            .format(bpm_name)
        )
    return bad_bpms_summary
def _parallel_laskar(samples, sequential, num_harms):
    freqs = pd.DataFrame(
        index=samples.index,
        data=np.zeros((samples.shape[0], num_harms), dtype=np.float)
    )
    coefs = pd.DataFrame(
        index=samples.index,
        data=np.zeros((samples.shape[0], num_harms), dtype=np.complex128)
    )
    def _collect_results(bpm_name, freq_and_coef):
        freq, coef = freq_and_coef
        freqs.loc[bpm_name] = freq
        coefs.loc[bpm_name] = coef
    pool = multiprocessing.Pool(np.min([PROCESSES, samples.shape[0]]))
    for bpm_name in samples.index:
        args = (samples.loc[bpm_name, :], num_harms)
        callback = partial(_collect_results, bpm_name)
        if sequential:
            callback(_laskar_per_mode(*args))
        else:
            pool.apply_async(_laskar_per_mode, args,
                             callback=callback)
    pool.close()
    pool.join()
    return freqs, coefs
def _laskar_per_mode(samples, number_of_harmonics):
    freqs, coefs = _laskar_method(samples.values, number_of_harmonics)
    return np.array(freqs), np.array(coefs)
def _compute_coefs_for_freqs(samples, freqs):  # Samples is a matrix
    n = samples.shape[1]
    coefficients = samples.dot(
        np.exp(-PI2I * np.outer(np.arange(n), freqs))
    ) / n
    return coefficients
def _get_main_resonances(harpy_input, frequencies, coefficients,
                         plane, tolerance, panda):
    h, v, l = MAIN_LINES[plane]
    freq = ((h * harpy_input.tunex) +
            (v * harpy_input.tuney) +
            (l * harpy_input.tunez))
    max_coefs, max_freqs = _search_highest_coefs(
        freq, tolerance, frequencies, coefficients
    )
    if not np.any(max_coefs) and plane != "Z":
        raise ValueError(
            "No main " + plane + " resonances found, "
            "try to increase the tolerance or adjust the tunes"
        )
    bad_bpms_by_tune = coefficients.loc[max_coefs == 0.].index
    panda['TUNE'+plane] = max_freqs
    panda['AMP'+plane] = max_coefs.abs()
    panda['MU'+plane] = np.angle(max_coefs) / (2 * np.pi)
    if plane != "Z":
        panda = panda.loc[panda.index.difference(bad_bpms_by_tune)]
    return panda, bad_bpms_by_tune
def _search_highest_coefs(freq, tolerance, frequencies, coefficients):
    min_val = freq - tolerance
    max_val = freq + tolerance
    freq_vals = frequencies.values
    coefs_vals = coefficients.values
    on_window_mask = (freq_vals >= min_val) & (freq_vals <= max_val)
    filtered_coefs = np.where(on_window_mask, coefs_vals, 0)
    filtered_amps = np.abs(filtered_coefs)
    max_indices = np.argmax(filtered_amps, axis=1)
    max_coefs = filtered_coefs[np.arange(coefs_vals.shape[0]), max_indices]
    max_coefs = pd.Series(index=coefficients.index, data=max_coefs)
    max_freqs = freq_vals[np.arange(freq_vals.shape[0]), max_indices]
    max_freqs = pd.Series(index=coefficients.index, data=np.where(max_coefs != 0, max_freqs, 0))
    return max_coefs, max_freqs
def _amp_and_mu_from_avg(bpm_matrix, plane, panda):
    avg_coefs = _compute_coefs_for_freqs(
        bpm_matrix,
        np.mean(panda.loc[:, 'TUNE' + plane])
    )
    panda['AMP'+plane] = np.abs(avg_coefs)
    panda['MU'+plane] = np.angle(avg_coefs) / (2 * np.pi)
    return panda
def _get_natural_tunes(frequencies, coefficients, harpy_input, plane, panda):
    if harpy_input.nattunex is None or harpy_input.nattuney is None:
        return panda
    if plane == "X":
        freq = harpy_input.nattunex
    if plane == "Y":
        freq = harpy_input.nattuney
    max_coefs, max_freqs = _search_highest_coefs(
        freq, harpy_input.tolerance, frequencies, coefficients
    )
    panda['NATTUNE' + plane] = max_freqs
    panda['NATMU' + plane] = np.angle(max_coefs) / (2 * np.pi)
    panda['NATAMP' + plane] = np.abs(max_coefs)
    return panda
# Vectorized - coefficiens are matrix:
def _resonance_search(frequencies, coefficients,
                      resonances_freqs, panda, n_turns):
    results = panda
    for resonance in resonances_freqs.keys():
        tolerance = _get_resonance_tolerance(resonance, n_turns)
        max_coefs, max_freqs = _search_highest_coefs(resonances_freqs[resonance], tolerance,
                                                     frequencies, coefficients)
        resstr = _get_resonance_suffix(resonance)
        results['AMP' + resstr] = np.abs(max_coefs)
        results['PHASE' + resstr] = np.angle(max_coefs) / (2 * np.pi)
        results['FREQ' + resstr] = max_freqs
    return results
def _get_resonance_suffix(resonance):
    x, y, z = resonance
    if z == 0:
        resstr = (str(x) + str(y)).replace("-", "_")
    else:
        resstr = (str(x) + str(y) + str(z)).replace("-", "_")
    return resstr
def _compute_resonances_freqs(plane, tunes):
    """
    Computes the frequencies for all the resonances listed in the
    constante RESONANCE_LISTS, together with the natural tunes
    frequencies if given.
    """
    tunex, tuney, tunez = tunes
    freqs = [(resonance_h * tunex) +
             (resonance_v * tuney) +
             (resonance_l * tunez)
             for (resonance_h, resonance_v, resonance_l) in RESONANCE_LISTS[plane]]
    # Move to [0, 1] domain.
    freqs = [freq + 1. if freq < 0. else freq for freq in freqs]
    resonances_freqs = dict(zip(RESONANCE_LISTS[plane], freqs))
    return resonances_freqs
def _laskar_method(tbt, num_harmonics):
    samples = tbt[:]  # Copy the samples array.
    n = len(samples)
    int_range = np.arange(n)
    coefficients = []
    frequencies = []
    for _ in range(num_harmonics):
        # Compute this harmonic frequency and coefficient.
        dft_data = _fft(samples)
        frequency = _jacobsen(dft_data, n)
        coefficient = _compute_coef(samples, frequency * n) / n
        # Store frequency and amplitude
        coefficients.append(coefficient)
        frequencies.append(frequency)
        # Subtract the found pure tune from the signal
        new_signal = coefficient * np.exp(PI2I * frequency * int_range)
        samples = samples - new_signal
    coefficients, frequencies = zip(*sorted(zip(coefficients, frequencies),
                                            key=lambda tuple: np.abs(tuple[0]),
                                            reverse=True))
    return frequencies, coefficients
def _jacobsen(dft_values, n):
    """
    This method interpolates the real frequency of the
    signal using the three highest peaks in the FFT.
    """
    k = np.argmax(np.abs(dft_values))
    r = dft_values
    delta = np.tan(np.pi / n) / (np.pi / n)
    kp = (k + 1) % n
    km = (k - 1) % n
    delta = delta * np.real((r[km] - r[kp]) / (2 * r[k] - r[km] - r[kp]))
    return (k + delta) / n
def _fft_method(tbt, num_harmonics):
    samples = tbt[:]  # Copy the samples array.
    n = float(len(samples))
    dft_data = _fft(samples)
    indices = np.argsort(np.abs(dft_data))[::-1][:num_harmonics]
    frequencies = indices / n
    coefficients = dft_data[indices] / n
    return frequencies, coefficients
def _compute_coef_simple(samples, kprime):
    """
    Computes the coefficient of the Discrete Time Fourier
    Transform corresponding to the given frequency (kprime).
    """
    n = len(samples)
    freq = kprime / n
    exponents = np.exp(-PI2I * freq * np.arange(n))
    coef = np.sum(exponents * samples)
    return coef
def _compute_coef_goertzel(samples, kprime):
    """
    Computes the coefficient of the Discrete Time Fourier
    Transform corresponding to the given frequency (kprime).
    This function is faster than the previous one if compiled
    with Numba.
    """
    n = len(samples)
    a = 2 * np.pi * (kprime / n)
    b = 2 * np.cos(a)
    c = np.exp(-complex(0, 1) * a)
    d = np.exp(-complex(0, 1) * ((2 * np.pi * kprime) / n) * (n - 1))
    s1 = 0.
    s2 = 0.
    for i in range(n - 1):
        s0 = samples[i] + b * s1 - s2
        s2 = s1
        s1 = s0
    s0 = samples[n - 1] + b * s1 - s2
    y = s0 - s1 * c
    return y * d
def _get_resonance_tolerance(resonance, n_turns):
    tolerance = max(1e-4, 1 / n_turns)
    x, y, z = resonance
    if z == 0:
        return (abs(x) + abs(y)) * tolerance
    return (abs(x) + abs(y) + abs(z) - 1) * tolerance
def _which_compute_coef():
    if not NUMBA_AVAILABLE:
        LOGGER.warn("Cannot import numba, using numpy functions.")
        return _compute_coef_simple
    LOGGER.debug("Using compiled Numba functions.")
    return jit(_compute_coef_goertzel, nopython=True, nogil=True)
_compute_coef = _which_compute_coef()
def windowed_padded_fft(matrix, svd, turn_bits, harpy_input):
    # TODO fft is used just once on real data -> rfft can be used and together with np.conj() when needed
    for_freqs = np.dot(np.diag(svd[1]), svd[2])
    length = for_freqs.shape[1]
    padded_length = np.power(2, turn_bits)
    ints2pi = 2. * np.pi * np.arange(length) / float(length)
    nuttal4 = 0.3125 - 0.46875 * np.cos(ints2pi) + 0.1875 * np.cos(2. * ints2pi) - 0.03125 * np.cos(3. * ints2pi)
    norm = np.sum(nuttal4)
    #nuttal3 = 0.375 - 0.5 * np.cos(ints2pi) + 0.125 * np.cos(2. * ints2pi)
    #norm = np.sum(nuttal3)
    s_vt_freq = np.fft.fft(for_freqs * nuttal4, n=padded_length)
    samples = np.power(2, turn_bits - 13)
    mask = _get_mask(harpy_input.tolerance, s_vt_freq.shape[1], harpy_input)
    extended = int(np.ceil(np.sum(mask) / float(samples)) * samples)
    coefficients = np.dot(svd[0], s_vt_freq[:, mask])
    new_coeffs = np.zeros((coefficients.shape[0], extended), dtype=np.complex128)
    new_freqs = np.zeros((coefficients.shape[0], extended))
    new_coeffs[:,:int(np.sum(mask))] = coefficients
    coef = new_coeffs.reshape(new_coeffs.shape[0], int(extended/samples), samples)
    argsmax = np.outer(np.ones(new_coeffs.shape[0], dtype=np.int), np.arange(int(extended/samples))*samples) + np.argmax(np.abs(coef), axis=2)
    freqs = np.arange(np.power(2, turn_bits), dtype=np.float64)[mask] / float(np.power(2, turn_bits))
    new_freqs[:, :int(np.sum(mask))] = freqs
    coeffs = pd.DataFrame(index=matrix.index, data=new_coeffs[np.arange(new_coeffs.shape[0])[:, None], argsmax] / norm)
    frequencies = pd.DataFrame(index=coeffs.index, data=new_freqs[np.arange(new_freqs.shape[0])[:, None], argsmax])
    return frequencies, coeffs
def _get_mask(tol, length, harpy_input):
    freqs = [0.0]
    for dim in ["X", "Y", "Z"]:
        freqs.extend([(resonance_h * harpy_input.tunex) + (resonance_v * harpy_input.tuney) + (resonance_l * harpy_input.tunez)
             for (resonance_h, resonance_v, resonance_l) in RESONANCE_LISTS[dim]])
    freqs.extend([harpy_input.nattunex, harpy_input.nattuney, harpy_input.tunez])
    # Move to [0, 1] domain.
    freqs = [freq + 1. if freq < 0. else freq for freq in freqs]
    freqs = np.array(freqs) * length
    tolerance = tol * length
    mask = np.zeros(length, dtype=bool)
    for res in freqs:
        mask[max(0, int(res - tolerance)):min(length, int(res + tolerance))] = True
    return mask