"""
This module is the actual implementation of the resonance search for
turn-by-turn data.
It uses a combination of the laskar method with SVD decomposition to speed up
the search of resonances.
"""
from __future__ import print_function
import multiprocessing
import logging
from functools import partial
from collections import OrderedDict
import numpy as np
import pandas as pd
from utils import outliers
try:
from scipy.fftpack import fft as _fft
except ImportError:
from numpy.fft import fft as _fft
NUMBA_AVAILABLE = True
try:
from numba import jit
except ImportError:
NUMBA_AVAILABLE = False
LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.NullHandler()) # avoid 'no handler' warn from _which_compute_coef() on import
PI2I = 2 * np.pi * complex(0, 1)
RESONANCE_LISTS = {
"X": ((0, 1, 0), (-2, 0, 0), (0, 2, 0), (-3, 0, 0), (-1, -1, 0),
(2, -2, 0), (0, -2, 0), (1, -2, 0), (-1, 3, 0), (1, 2, 0),
(-2, 1, 0), (1, 1, 0), (2, 0, 0), (-1, -2, 0), (3, 0, 0)),
"Y": ((1, 0, 0), (-1, 1, 0), (-2, 0, 0), (1, -1, 0), (0, -2, 0),
(0, -3, 0), (2, 1, 0), (-1, 3, 0), (1, 1, 0), (-1, 2, 0)),
"Z": ((1, 0, 1), (0, 1, 1))
}
MAIN_LINES = {"X": (1, 0, 0), "Y": (0, 1, 0), "Z": (0, 0, 1)}
Z_TOLERANCE = 0.0001
NUM_HARMS = 300
NUM_HARMS_SVD = 100
PROCESSES = multiprocessing.cpu_count()
[docs]def harpy(harpy_input, bpm_matrix_x, usv_x, bpm_matrix_y, usv_y):
"""
"""
all_frequencies = {}
all_coefficients = {}
spectr = {}
bad_bpms_summaries = {}
pandas_dfs = {}
cases = (("X", bpm_matrix_x, usv_x),
("Y", bpm_matrix_y, usv_y))
for plane, bpm_matrix, usv in cases:
panda = pd.DataFrame(index=bpm_matrix.index, columns=OrderedDict())
if harpy_input.harpy_mode == "window":
frequencies, coefficients = windowed_padded_fft(bpm_matrix, usv, 21, harpy_input)
else:
frequencies, coefficients = harmonic_analysis(bpm_matrix, usv=usv,
mode=harpy_input.harpy_mode,
sequential=harpy_input.sequential, )
spectr[plane] = _get_bpms_spectr(bpm_matrix,
coefficients,
frequencies)
panda, not_tune_bpms = _get_main_resonances(
harpy_input, frequencies, coefficients,
plane, harpy_input.tolerance, panda
)
cleaned_by_tune_bpms = []
if not harpy_input.no_tune_clean:
cleaned_by_tune_bpms = clean_by_tune(panda.loc[:, 'TUNE' + plane],
harpy_input.tune_clean_limit)
panda = panda.loc[panda.index.difference(cleaned_by_tune_bpms)]
bpm_matrix = bpm_matrix.loc[panda.index]
coefficients = coefficients.loc[panda.index]
frequencies = frequencies.loc[panda.index]
bad_bpms_summaries[plane] = _get_bad_bpms_summary(
not_tune_bpms,
cleaned_by_tune_bpms
)
panda = _amp_and_mu_from_avg(bpm_matrix, plane, panda)
panda = _get_natural_tunes(frequencies, coefficients, harpy_input, plane, panda)
if harpy_input.tunez > 0.0:
panda, _ = _get_main_resonances(
harpy_input, frequencies, coefficients,
"Z", Z_TOLERANCE, panda
)
pandas_dfs[plane] = panda
all_frequencies[plane] = frequencies
all_coefficients[plane] = coefficients
tunez = 0.0
if harpy_input.tunez > 0.0:
tunez = pandas_dfs["X"]["TUNEZ"].mean()
tunes = (pandas_dfs["X"]["TUNEX"].mean(),
pandas_dfs["Y"]["TUNEY"].mean(),
tunez)
nattunes = (pandas_dfs["X"]["NATTUNEX"].mean(),
pandas_dfs["Y"]["NATTUNEY"].mean())
# Log as an error in tune == nattune
if tunes[0] == nattunes[0]:
LOGGER.error('Tune and nattune for plane X are equal: {}'.format(tunes[0]))
if tunes[1] == nattunes[1]:
LOGGER.error('Tune and nattune for plane Y are equal: {}'.format(tunes[1]))
n_turns = bpm_matrix_x.shape[1]
for plane in ("X", "Y"):
resonances_freqs = _compute_resonances_freqs(plane, tunes)
if harpy_input.tunez > 0.0:
resonances_freqs.update(_compute_resonances_freqs("Z", tunes))
pandas_dfs[plane] = _resonance_search(all_frequencies[plane],
all_coefficients[plane],
resonances_freqs,
pandas_dfs[plane],
n_turns)
yield pandas_dfs[plane], spectr[plane], bad_bpms_summaries[plane]
def _get_bpms_spectr(bpm_matrix, coefficients, frequencies):
all_bpms_coefs = pd.DataFrame(data=coefficients,
index=bpm_matrix.index)
all_bpms_freqs = pd.DataFrame(data=frequencies,
index=bpm_matrix.index)
all_bpms_spectr = {"COEFS": all_bpms_coefs, "FREQS": all_bpms_freqs}
return all_bpms_spectr
[docs]def harmonic_analysis(bpm_matrix=None, usv=None, mode="bpm", sequential=False):
"""
Performs the laskar method on every of the BPMs signals contained in each
row of the bpm_matrix pandas DataFame.
Args:
bpm_matrix: Pandas DataFrame containing the signals of each bpm in
each row. In bpm mode this parameter is obligatory.
usv: Truncated SVD decomposition of the bpm matrix. It must contain a
tuple (U, S, V), where U must be a DataFrame with the bpm names as index.
mode: one of 'bpm', 'svd', or 'fast'. Check 'harmonic_analysis_bpm'
documentation for 'bpm' mode and 'harmonic_analysis_svd" for 'svd' and 'fast'.
sequential: If true, it will run all the computations in a single core.
Returns:
frequencies: A numpy array with the frequencies found per BPM.
bpm_coefficients: A numpy array containing the complex coefficients found per BPM.
"""
if mode == "bpm":
if bpm_matrix is None:
raise ValueError("bpm_matrix has to be provided "
"for the bpm mode")
frequencies, bpm_coefficients = harmonic_analysis_bpm(
bpm_matrix,
sequential=sequential,
num_harms=NUM_HARMS,
)
elif mode in ("svd", "fast"):
if usv is None:
raise ValueError("SVD decomposition has to be provided "
"for fast or svd modes")
num_harms = NUM_HARMS if mode == "fast" else NUM_HARMS_SVD
frequencies, bpm_coefficients = harmonic_analysis_svd(
usv,
fast=mode == "fast",
sequential=sequential,
num_harms=num_harms,
)
else:
raise ValueError("Invalid harpy mode: {}".format(mode))
return frequencies, bpm_coefficients
[docs]def harmonic_analysis_bpm(bpm_matrix,
sequential=False, num_harms=NUM_HARMS):
"""
Performs the laskar method on every of the BPMs signals contained in each
row of the bpm_matrix pandas DataFame. This method will run the full
laskar analysis in each BPM in parallel (slow).
Args:
bpm_matrix: Pandas DataFrame containing the signals of each bpm in
each row. In bpm mode this parameter is obligatory.
sequential: If true, it will run all the computations in a single core.
num_harms: Number of harmonics to compute per BPM.
Returns:
frequencies: A numpy array with the frequencies found per BPM.
bpm_coefficients: A numpy array containing the complex coefficients found per BPM.
"""
frequencies, bpm_coefficients = _parallel_laskar(
bpm_matrix, sequential, num_harms,
)
return frequencies, bpm_coefficients
[docs]def harmonic_analysis_svd(usv, fast=False,
sequential=False, num_harms=NUM_HARMS):
"""
Performs the laskar method on every of the BPMs signals contained in each
row of the bpm_matrix pandas DataFame. It takes advantage of the
truncation of the V matrix in the SVD decomposition to speed up the
calculations. Mode 'svd' is slower but more precise, 'bpm' mode is the
fastest but makes more assumptions.
Args:
usv: Truncated SVD decomposition of the bpm matrix. It must contain a
tuple (U, S, V), where U must be a DataFrame with the bpm names
as index.
fast: If true, it will use the 'fast' calculation mode ('svd'
otherwise)
sequential: If true, it will run all the computations in a single core.
num_harms: Number of harmonics to compute per BPM.
Returns:
frequencies: A numpy array with the frequencies found per BPM.
bpm_coefficients: A numpy array containing the complex coefficients found per BPM.
"""
sv = np.dot(np.diag(usv[1]), usv[2])
fake_bpms = ["MODE{}".format(i) for i in range(sv.shape[0])]
sv = pd.DataFrame(index=fake_bpms, data=sv)
if fast:
frequencies, _ = _laskar_per_mode(np.mean(sv, axis=0), num_harms)
else:
frequencies, _ = _parallel_laskar(
sv, sequential, num_harms,
)
frequencies = np.ravel(frequencies)
svd_coefficients = _compute_coefs_for_freqs(sv, frequencies)
bpm_coefficients = usv[0].dot(svd_coefficients.values)
frequencies = pd.DataFrame(
index=bpm_coefficients.index,
data=np.outer(np.ones(bpm_coefficients.shape[0]), frequencies)
)
return frequencies, bpm_coefficients
[docs]def clean_by_tune(tunes, tune_clean_limit):
"""
This function looks for outliers in the tunes pandas Series and returns
their indices.
Args:
tunes: Pandas series with the tunes per BPM and the BPM names as
index.
tune_clean_limit: No BPM will find as oulier if its distance to the
average is lower than this limit.
"""
bad_bpms_mask = outliers.get_filter_mask(
tunes,
limit=tune_clean_limit,
)
bad_bpms_names = tunes[~bad_bpms_mask].index
return bad_bpms_names
def _get_bad_bpms_summary(not_tune_bpms, cleaned_by_tune_bpms):
bad_bpms_summary = []
for bpm_name in not_tune_bpms:
bad_bpms_summary.append(bpm_name +
" The main resonance has not been found.")
for bpm_name in cleaned_by_tune_bpms:
bad_bpms_summary.append(
"{} tune is too far from average"
.format(bpm_name)
)
return bad_bpms_summary
def _parallel_laskar(samples, sequential, num_harms):
freqs = pd.DataFrame(
index=samples.index,
data=np.zeros((samples.shape[0], num_harms), dtype=np.float)
)
coefs = pd.DataFrame(
index=samples.index,
data=np.zeros((samples.shape[0], num_harms), dtype=np.complex128)
)
def _collect_results(bpm_name, freq_and_coef):
freq, coef = freq_and_coef
freqs.loc[bpm_name] = freq
coefs.loc[bpm_name] = coef
pool = multiprocessing.Pool(np.min([PROCESSES, samples.shape[0]]))
for bpm_name in samples.index:
args = (samples.loc[bpm_name, :], num_harms)
callback = partial(_collect_results, bpm_name)
if sequential:
callback(_laskar_per_mode(*args))
else:
pool.apply_async(_laskar_per_mode, args,
callback=callback)
pool.close()
pool.join()
return freqs, coefs
def _laskar_per_mode(samples, number_of_harmonics):
freqs, coefs = _laskar_method(samples.values, number_of_harmonics)
return np.array(freqs), np.array(coefs)
def _compute_coefs_for_freqs(samples, freqs): # Samples is a matrix
n = samples.shape[1]
coefficients = samples.dot(
np.exp(-PI2I * np.outer(np.arange(n), freqs))
) / n
return coefficients
def _get_main_resonances(harpy_input, frequencies, coefficients,
plane, tolerance, panda):
h, v, l = MAIN_LINES[plane]
freq = ((h * harpy_input.tunex) +
(v * harpy_input.tuney) +
(l * harpy_input.tunez))
max_coefs, max_freqs = _search_highest_coefs(
freq, tolerance, frequencies, coefficients
)
if not np.any(max_coefs) and plane != "Z":
raise ValueError(
"No main " + plane + " resonances found, "
"try to increase the tolerance or adjust the tunes"
)
bad_bpms_by_tune = coefficients.loc[max_coefs == 0.].index
panda['TUNE'+plane] = max_freqs
panda['AMP'+plane] = max_coefs.abs()
panda['MU'+plane] = np.angle(max_coefs) / (2 * np.pi)
if plane != "Z":
panda = panda.loc[panda.index.difference(bad_bpms_by_tune)]
return panda, bad_bpms_by_tune
def _search_highest_coefs(freq, tolerance, frequencies, coefficients):
min_val = freq - tolerance
max_val = freq + tolerance
freq_vals = frequencies.values
coefs_vals = coefficients.values
on_window_mask = (freq_vals >= min_val) & (freq_vals <= max_val)
filtered_coefs = np.where(on_window_mask, coefs_vals, 0)
filtered_amps = np.abs(filtered_coefs)
max_indices = np.argmax(filtered_amps, axis=1)
max_coefs = filtered_coefs[np.arange(coefs_vals.shape[0]), max_indices]
max_coefs = pd.Series(index=coefficients.index, data=max_coefs)
max_freqs = freq_vals[np.arange(freq_vals.shape[0]), max_indices]
max_freqs = pd.Series(index=coefficients.index, data=np.where(max_coefs != 0, max_freqs, 0))
return max_coefs, max_freqs
def _amp_and_mu_from_avg(bpm_matrix, plane, panda):
avg_coefs = _compute_coefs_for_freqs(
bpm_matrix,
np.mean(panda.loc[:, 'TUNE' + plane])
)
panda['AMP'+plane] = np.abs(avg_coefs)
panda['MU'+plane] = np.angle(avg_coefs) / (2 * np.pi)
return panda
def _get_natural_tunes(frequencies, coefficients, harpy_input, plane, panda):
if harpy_input.nattunex is None or harpy_input.nattuney is None:
return panda
if plane == "X":
freq = harpy_input.nattunex
if plane == "Y":
freq = harpy_input.nattuney
max_coefs, max_freqs = _search_highest_coefs(
freq, harpy_input.tolerance, frequencies, coefficients
)
panda['NATTUNE' + plane] = max_freqs
panda['NATMU' + plane] = np.angle(max_coefs) / (2 * np.pi)
panda['NATAMP' + plane] = np.abs(max_coefs)
return panda
# Vectorized - coefficiens are matrix:
def _resonance_search(frequencies, coefficients,
resonances_freqs, panda, n_turns):
results = panda
for resonance in resonances_freqs.keys():
tolerance = _get_resonance_tolerance(resonance, n_turns)
max_coefs, max_freqs = _search_highest_coefs(resonances_freqs[resonance], tolerance,
frequencies, coefficients)
resstr = _get_resonance_suffix(resonance)
results['AMP' + resstr] = np.abs(max_coefs)
results['PHASE' + resstr] = np.angle(max_coefs) / (2 * np.pi)
results['FREQ' + resstr] = max_freqs
return results
def _get_resonance_suffix(resonance):
x, y, z = resonance
if z == 0:
resstr = (str(x) + str(y)).replace("-", "_")
else:
resstr = (str(x) + str(y) + str(z)).replace("-", "_")
return resstr
def _compute_resonances_freqs(plane, tunes):
"""
Computes the frequencies for all the resonances listed in the
constante RESONANCE_LISTS, together with the natural tunes
frequencies if given.
"""
tunex, tuney, tunez = tunes
freqs = [(resonance_h * tunex) +
(resonance_v * tuney) +
(resonance_l * tunez)
for (resonance_h, resonance_v, resonance_l) in RESONANCE_LISTS[plane]]
# Move to [0, 1] domain.
freqs = [freq + 1. if freq < 0. else freq for freq in freqs]
resonances_freqs = dict(zip(RESONANCE_LISTS[plane], freqs))
return resonances_freqs
def _laskar_method(tbt, num_harmonics):
samples = tbt[:] # Copy the samples array.
n = len(samples)
int_range = np.arange(n)
coefficients = []
frequencies = []
for _ in range(num_harmonics):
# Compute this harmonic frequency and coefficient.
dft_data = _fft(samples)
frequency = _jacobsen(dft_data, n)
coefficient = _compute_coef(samples, frequency * n) / n
# Store frequency and amplitude
coefficients.append(coefficient)
frequencies.append(frequency)
# Subtract the found pure tune from the signal
new_signal = coefficient * np.exp(PI2I * frequency * int_range)
samples = samples - new_signal
coefficients, frequencies = zip(*sorted(zip(coefficients, frequencies),
key=lambda tuple: np.abs(tuple[0]),
reverse=True))
return frequencies, coefficients
def _jacobsen(dft_values, n):
"""
This method interpolates the real frequency of the
signal using the three highest peaks in the FFT.
"""
k = np.argmax(np.abs(dft_values))
r = dft_values
delta = np.tan(np.pi / n) / (np.pi / n)
kp = (k + 1) % n
km = (k - 1) % n
delta = delta * np.real((r[km] - r[kp]) / (2 * r[k] - r[km] - r[kp]))
return (k + delta) / n
def _fft_method(tbt, num_harmonics):
samples = tbt[:] # Copy the samples array.
n = float(len(samples))
dft_data = _fft(samples)
indices = np.argsort(np.abs(dft_data))[::-1][:num_harmonics]
frequencies = indices / n
coefficients = dft_data[indices] / n
return frequencies, coefficients
def _compute_coef_simple(samples, kprime):
"""
Computes the coefficient of the Discrete Time Fourier
Transform corresponding to the given frequency (kprime).
"""
n = len(samples)
freq = kprime / n
exponents = np.exp(-PI2I * freq * np.arange(n))
coef = np.sum(exponents * samples)
return coef
def _compute_coef_goertzel(samples, kprime):
"""
Computes the coefficient of the Discrete Time Fourier
Transform corresponding to the given frequency (kprime).
This function is faster than the previous one if compiled
with Numba.
"""
n = len(samples)
a = 2 * np.pi * (kprime / n)
b = 2 * np.cos(a)
c = np.exp(-complex(0, 1) * a)
d = np.exp(-complex(0, 1) * ((2 * np.pi * kprime) / n) * (n - 1))
s1 = 0.
s2 = 0.
for i in range(n - 1):
s0 = samples[i] + b * s1 - s2
s2 = s1
s1 = s0
s0 = samples[n - 1] + b * s1 - s2
y = s0 - s1 * c
return y * d
def _get_resonance_tolerance(resonance, n_turns):
tolerance = max(1e-4, 1 / n_turns)
x, y, z = resonance
if z == 0:
return (abs(x) + abs(y)) * tolerance
return (abs(x) + abs(y) + abs(z) - 1) * tolerance
def _which_compute_coef():
if not NUMBA_AVAILABLE:
LOGGER.warn("Cannot import numba, using numpy functions.")
return _compute_coef_simple
LOGGER.debug("Using compiled Numba functions.")
return jit(_compute_coef_goertzel, nopython=True, nogil=True)
_compute_coef = _which_compute_coef()
def windowed_padded_fft(matrix, svd, turn_bits, harpy_input):
# TODO fft is used just once on real data -> rfft can be used and together with np.conj() when needed
for_freqs = np.dot(np.diag(svd[1]), svd[2])
length = for_freqs.shape[1]
padded_length = np.power(2, turn_bits)
ints2pi = 2. * np.pi * np.arange(length) / float(length)
nuttal4 = 0.3125 - 0.46875 * np.cos(ints2pi) + 0.1875 * np.cos(2. * ints2pi) - 0.03125 * np.cos(3. * ints2pi)
norm = np.sum(nuttal4)
#nuttal3 = 0.375 - 0.5 * np.cos(ints2pi) + 0.125 * np.cos(2. * ints2pi)
#norm = np.sum(nuttal3)
s_vt_freq = np.fft.fft(for_freqs * nuttal4, n=padded_length)
samples = np.power(2, turn_bits - 13)
mask = _get_mask(harpy_input.tolerance, s_vt_freq.shape[1], harpy_input)
extended = int(np.ceil(np.sum(mask) / float(samples)) * samples)
coefficients = np.dot(svd[0], s_vt_freq[:, mask])
new_coeffs = np.zeros((coefficients.shape[0], extended), dtype=np.complex128)
new_freqs = np.zeros((coefficients.shape[0], extended))
new_coeffs[:,:int(np.sum(mask))] = coefficients
coef = new_coeffs.reshape(new_coeffs.shape[0], int(extended/samples), samples)
argsmax = np.outer(np.ones(new_coeffs.shape[0], dtype=np.int), np.arange(int(extended/samples))*samples) + np.argmax(np.abs(coef), axis=2)
freqs = np.arange(np.power(2, turn_bits), dtype=np.float64)[mask] / float(np.power(2, turn_bits))
new_freqs[:, :int(np.sum(mask))] = freqs
coeffs = pd.DataFrame(index=matrix.index, data=new_coeffs[np.arange(new_coeffs.shape[0])[:, None], argsmax] / norm)
frequencies = pd.DataFrame(index=coeffs.index, data=new_freqs[np.arange(new_freqs.shape[0])[:, None], argsmax])
return frequencies, coeffs
def _get_mask(tol, length, harpy_input):
freqs = [0.0]
for dim in ["X", "Y", "Z"]:
freqs.extend([(resonance_h * harpy_input.tunex) + (resonance_v * harpy_input.tuney) + (resonance_l * harpy_input.tunez)
for (resonance_h, resonance_v, resonance_l) in RESONANCE_LISTS[dim]])
freqs.extend([harpy_input.nattunex, harpy_input.nattuney, harpy_input.tunez])
# Move to [0, 1] domain.
freqs = [freq + 1. if freq < 0. else freq for freq in freqs]
freqs = np.array(freqs) * length
tolerance = tol * length
mask = np.zeros(length, dtype=bool)
for res in freqs:
mask[max(0, int(res - tolerance)):min(length, int(res + tolerance))] = True
return mask