Source code for harmonic_analysis.clean

from __future__ import print_function
import logging
from datetime import datetime
import numpy as np
import pandas as pd

SPARSE_AVAILABLE = True
try:
    from scipy.sparse.linalg.eigen.arpack.arpack import svds
except ImportError:
    SPARSE_AVAILABLE = False

PI2I = 2 * np.pi * complex(0, 1)
LOGGER = logging.getLogger(__name__)

# piotr: for data 20160408 01:45+ (40cm) B1
LIST_OF_KNOWN_BAD_BPMS = ["BPM.23L6.B1", "BPM.16R3.B1", "BPM.22R8.B1", "BPM.15R8.B1"]
LIST_OF_WRONG_POLARITY_BPMS = []

# Noise to signal limit
NTS_LIMIT = 8.


def clean(bpm_data, clean_input, file_date):
    
    LOGGER.debug("clean: number of BPMs in the input %s ", bpm_data.index.size)
    known_bad_bpm_names = clean_input.bad_bpms + LIST_OF_KNOWN_BAD_BPMS
    known_bad_bpms = detect_known_bad_bpms(bpm_data, known_bad_bpm_names)
    bpm_flatness = detect_flat_bpms(bpm_data, clean_input.peak_to_peak)
    bpm_spikes = detect_bpms_with_spikes(bpm_data, clean_input.max_peak)
    if  not clean_input.no_exact_zeros :
        exact_zeros = detect_bpms_with_exact_zeros(bpm_data)
    else :
        exact_zeros = pd.DataFrame()
        LOGGER.debug ("clean: Skipped exact zero check %d",exact_zeros.size)
      

    original_bpms = bpm_data.index
    
    all_bad_bpms = _index_union(known_bad_bpms, bpm_flatness,
                                bpm_spikes, exact_zeros)
    bpm_data = bpm_data.loc[bpm_data.index.difference(all_bad_bpms)]

    bad_bpms_with_reasons = _get_bad_bpms_summary(
        clean_input, known_bad_bpms, bpm_flatness, bpm_spikes, exact_zeros
    )
    _report_clean_stats(original_bpms.size, bpm_data.index.size)

    wrong_plrty_names = (LIST_OF_WRONG_POLARITY_BPMS +
                         clean_input.wrong_polarity_bpms)
    bpm_data = fix_polarity(wrong_plrty_names, bpm_data)

    if not clean_input.noresync:
        bpm_data = resync_bpms(bpm_data, file_date)

    return bpm_data, bad_bpms_with_reasons


[docs]def fix_polarity(wrong_polarity_names, bpm_data): """ Fixes wrong polarity """ bpm_data.loc[wrong_polarity_names] *= -1 return bpm_data
[docs]def resync_bpms(bpm_data, file_date): """ Resynchronizes BPMs between the injection point and start of the lattice if the acquisition date is after 2016-04-01. """ if file_date > datetime(2016, 4, 1): LOGGER.debug("Will resynchronize BPMs") b1_leftover = ["BPMYB.5L2.B1", "BPMYB.4L2.B1", "BPMWI.4L2.B1", "BPMSX.4L2.B1", "BPMS.2L2.B1", "BPMSW.1L2.B1"] mask1 = (bpm_data.index.str.endswith("L2.B1") & np.array([x not in b1_leftover for x in bpm_data.index])) bpm_data.loc[mask1] = np.roll(bpm_data.loc[mask1], -1, axis=1) b2_leftover = ["BPMYB.4R8.B2", "BPMWI.4R8.B2", "BPMSX.4R8.B2", "BPMS.2R8.B2", "BPMSW.1R8.B2"] mask2 = (bpm_data.index.str.endswith("R8.B2") & np.array([x not in b2_leftover for x in bpm_data.index])) bpm_data.loc[mask2] = np.roll(bpm_data.loc[mask2], -1, axis=1) bpm_data = bpm_data.iloc[:, :-1] return bpm_data
[docs]def detect_known_bad_bpms(bpm_data, list_of_bad_bpms): """ Searches for known bad BPMs """ known_bad_bpms = bpm_data.index.intersection(list_of_bad_bpms) return known_bad_bpms
[docs]def detect_flat_bpms(bpm_data, min_peak_to_peak): """ Detects BPMs with the same values for all turns """ cond = ((bpm_data.max(axis=1) - bpm_data.min(axis=1)).abs() < min_peak_to_peak) bpm_flatness = bpm_data[cond].index if bpm_flatness.size: LOGGER.debug( "Flat BPMS detected (diff min/max <= %s. BPMs removed: %s", min_peak_to_peak, bpm_flatness.size, ) return bpm_flatness
[docs]def detect_bpms_with_spikes(bpm_data, max_peak_cut): """ Detects BPMs with spikes > max_peak_cut """ too_high = bpm_data[bpm_data.max(axis=1) > max_peak_cut].index too_low = bpm_data[bpm_data.min(axis=1) < -max_peak_cut].index bpm_spikes = too_high.union(too_low) if bpm_spikes.size: LOGGER.debug("Spikes > %s detected. BPMs removed: %s", max_peak_cut, bpm_spikes.size) return bpm_spikes
[docs]def detect_bpms_with_exact_zeros(bpm_data): """ Detects BPMs with exact zeros due to OP workaround """ exact_zeros = bpm_data[~np.all(bpm_data, axis=1)].index if exact_zeros.size: LOGGER.debug("Exact zeros detected. BPMs removed: %s", exact_zeros.size) return exact_zeros
def svd_decomposition(clean_input, bpm_data): # Parameters for matrix normalisation sqrt_number_of_turns = np.sqrt(bpm_data.shape[1]) bpm_data_mean = np.mean(bpm_data.values, axis=1) normalized_data = (bpm_data - bpm_data_mean[:, None]) / sqrt_number_of_turns svd_functs = { "NUM": _get_singular_value_decomposition, "SPA": _get_singular_value_decomposition_sparse, "RAN": _get_singular_value_decomposition_random, } U, S, V = svd_functs[clean_input.svd_mode.upper()[:3]]( normalized_data, clean_input.sing_val ) num = np.sum(S > 0.) U = pd.DataFrame(index=bpm_data.index, data=U) USV = U.loc[:, :num], S[:num], V[:num, :] if num < USV[1].shape[0]: LOGGER.warn("Zero singular values detected.") return USV, bpm_data_mean, sqrt_number_of_turns def svd_clean(bpm_data, clean_input): USV, bpm_data_mean, sqrt_number_of_turns = svd_decomposition(clean_input, bpm_data) clean_U, dominance_summary = _clean_dominant_bpms( USV[0], clean_input.single_svd_bpm_threshold ) mask = np.max(USV[0].abs(), axis=1) <= clean_input.single_svd_bpm_threshold # Reconstruct the SVD-cleaned data USV = clean_U, USV[1], USV[2] A = USV[0].dot(np.dot(np.diag(USV[1]), USV[2])) A = A * sqrt_number_of_turns + bpm_data_mean[mask, None] # BPM resolution computation bpm_res = (A - bpm_data.loc[clean_U.index]).std(axis=1) LOGGER.debug("Average BPM resolution: %s", str(np.mean(bpm_res))) LOGGER.debug("np.mean(np.std(A, axis=1): %s", np.mean(np.std(A, axis=1))) if np.mean(bpm_res) > NTS_LIMIT * np.mean(np.std(A, axis=1)): raise ValueError( "The data is too noisy. The most probable explanation" " is that there was no excitation or it was very low.") # Denormalize the data V = (USV[2].T - np.mean(USV[2], axis=1)).T USV = (USV[0], USV[1] * sqrt_number_of_turns, V) good_bpm_data = A return good_bpm_data, bpm_res, dominance_summary, USV # HELPER FUNCTIONS ######################### def _get_bad_bpms_summary(clean_input, known_bad_bpms, bpm_flatness, bpm_spikes, exact_zeros): bad_bpms_summary = [] for bpm_name in known_bad_bpms: bad_bpms_summary.append("{} Known bad BPM".format(bpm_name)) for bpm_name in bpm_flatness: bad_bpms_summary.append( "{} Flat BPM, the difference " "between min/max is smaller than {}" .format(bpm_name, clean_input.peak_to_peak) ) for bpm_name in bpm_spikes: bad_bpms_summary.append( "{} Spiky BPM, found spike higher than {}" .format(bpm_name, clean_input.max_peak) ) for bpm_name in exact_zeros: bad_bpms_summary.append( "{} Found an exact zero" .format(bpm_name) ) return bad_bpms_summary def _report_clean_stats(n_total_bpms, n_good_bpms): LOGGER.debug("Filtering done:") if n_total_bpms == 0: raise ValueError("Total Number of BPMs after filtering is zero " ) n_bad_bpms = n_total_bpms - n_good_bpms LOGGER.debug( "(Statistics for file reading) Total BPMs: {0}, " "Good BPMs: {1} ({2:2.2f}%), bad BPMs: {3} ({4:2.2f}%)" .format(n_total_bpms, n_good_bpms, 100.0 * (n_good_bpms / float(n_total_bpms)), n_bad_bpms, 100.0 * (n_bad_bpms / float(n_total_bpms)))) if (n_good_bpms / float(n_total_bpms)) < 0.5: raise ValueError( "More than half of BPMs are bad. " "This could be cause a bunch not present in the machine has been " "selected or because a problem with the phasing of the BPMs." ) def _index_union(*indices): new_index = pd.Index([]) for index in indices: new_index = new_index.union(index) return new_index # Removes noise floor: having MxN matrix # and requiring K singular values we get SVD ((MxK) x diag(K) x (K,N)) def _get_singular_value_decomposition(matrix, num): LOGGER.debug("Using NumPy SVD") if num > min(matrix.shape): LOGGER.warn("Requested more singular values than available(={0})" .format(min(matrix.shape))) return np.linalg.svd(matrix, full_matrices=False) LOGGER.debug("Amount of singular values to keep: {0}".format(num)) U, S, V = np.linalg.svd(matrix, full_matrices=False) return (U[:, :num], S[:num], V[:num, :]) def _get_singular_value_decomposition_sparse(matrix, num): if not SPARSE_AVAILABLE: LOGGER.warn("Cannot import scipy sparse SVD, falling back to Numpy.") return _get_singular_value_decomposition(matrix, num) LOGGER.debug("Using Sparse SVD") return svds(matrix, k=num) # SVD of a normalized matrix using random matrix to # get lower rank approximation def _get_singular_value_decomposition_random(matrix, num): LOGGER.debug("Using Randomized SVD") Q = np.linalg.qr(np.dot(matrix, np.random.randn(matrix.shape[1], num + 6)))[0] U, S, V = np.linalg.svd(np.dot(np.transpose(Q), matrix), full_matrices=False) return (np.dot(Q, U)[:, :num], S[:num], V[:num, :]) def _clean_dominant_bpms(U, single_svd_bpm_threshold): if single_svd_bpm_threshold < 1 / np.sqrt(2): LOGGER.warn("Careful, the single_svd_bpm_threshold looks too low %s.", single_svd_bpm_threshold) dominant_bpms = U[np.max(U.abs(), axis=1) > single_svd_bpm_threshold].index dominance_summary = [] if dominant_bpms.size > 0: for bpm_name in dominant_bpms: dominance_summary.append( "{} Detected from SVD, single peak value is greater then {}" .format(bpm_name, str(single_svd_bpm_threshold)) ) LOGGER.debug("Bad BPMs from SVD detected. Number of BPMs removed: %s", dominant_bpms.size) clean_U = U.loc[U.index.difference(dominant_bpms)] return clean_U, dominance_summary