Source code for tune_analysis.bbq_tools

"""
Module tune_analysis.bbq_tools
----------------------------------

Tools to handle BBQ data.

This package contains a collection of tools to handle and modify BBQ data:
 - Calculating moving average
 - Plotting
"""

import datetime
import os

import matplotlib.dates as mdates
import numpy as np
from matplotlib import pyplot as plt, gridspec
from matplotlib.ticker import FormatStrFormatter
from matplotlib import colors
import constants as const
from utils import logging_tools
from plotshop import plot_style as ps

TIMEZONE = const.get_experiment_timezone()

PLANES = const.get_planes()

COL_MAV = const.get_mav_col
COL_IN_MAV = const.get_used_in_mav_col
COL_BBQ = const.get_bbq_col

LOG = logging_tools.get_logger(__name__)


[docs]def get_moving_average(data_series, length=20, min_val=None, max_val=None, fine_length=None, fine_cut=None): """ Get a moving average of the ``data_series`` over ``length`` entries. The data can be filtered beforehand. The values are shifted, so that the averaged value takes ceil((length-1)/2) values previous and floor((length-1)/2) following values into account. Args: data_series: Series of data length: length of the averaging window min_val: minimum value (for filtering) max_val: maximum value (for filtering) fine_length: length of the averaging window for fine cleaning fine_cut: allowed deviation for fine cleaning Returns: filtered and averaged Series and the mask used for filtering data. """ LOG.debug("Calculating BBQ moving average of length {:d}.".format(length)) if bool(fine_length) != bool(fine_cut): raise NotImplementedError("To activate fine cleaning, both " "'fine_window' and 'fine_cut' are needed.") if min_val is not None: min_mask = data_series <= min_val else: min_mask = np.zeros(data_series.size, dtype=bool) if max_val is not None: max_mask = data_series >= max_val else: max_mask = np.zeros(data_series.size, dtype=bool) cut_mask = min_mask | max_mask _is_almost_empty_mask(~cut_mask, length) data_mav, std_mav = _get_interpolated_moving_average(data_series, cut_mask, length) if fine_length is not None: min_mask = data_series <= (data_mav - fine_cut) max_mask = data_series >= (data_mav + fine_cut) cut_mask = min_mask | max_mask _is_almost_empty_mask(~cut_mask, fine_length) data_mav, std_mav = _get_interpolated_moving_average(data_series, cut_mask, fine_length) return data_mav, std_mav, cut_mask
[docs]def plot_bbq_data(bbq_df, interval=None, xmin=None, xmax=None, ymin=None, ymax=None, output=None, show=True, two_plots=False): """ Plot BBQ data. Args: bbq_df: BBQ Dataframe with moving average columns interval: start and end time of used interval, will be marked with red bars xmin: Lower x limit (time) xmax: Upper x limit (time) ymin: Lower y limit (tune) ymax: Upper y limit (tune) output: Path to the output file show: Shows plot if `True` two_plots: Plots each tune in it's own axes if `True` Returns: Plotted figure """ LOG.debug("Plotting BBQ data.") ps.set_style("standard", { u'figure.figsize': [12.24, 7.68], u"lines.marker": u"", u"lines.linestyle": u""} ) fig = plt.figure() if two_plots: gs = gridspec.GridSpec(2, 1, height_ratios=[1, 1]) ax = [fig.add_subplot(gs[1]), fig.add_subplot(gs[0])] else: gs = gridspec.GridSpec(1, 1, height_ratios=[1]) ax = fig.add_subplot(gs[0]) ax = [ax, ax] bbq_df.index = [datetime.datetime.fromtimestamp(time, tz=TIMEZONE) for time in bbq_df.index] handles = [None] * (3 * len(PLANES)) for idx, plane in enumerate(PLANES): color = ps.get_mpl_color(idx) mask = bbq_df[COL_IN_MAV(plane)] # plot and save handles for nicer legend handles[idx] = ax[idx].plot(bbq_df.index, bbq_df[COL_BBQ(plane)], color=ps.change_color_brightness(color, .4), marker="o", markerfacecolor="None", label="$Q_{:s}$".format(plane.lower(),) )[0] filtered_data = bbq_df.loc[mask, COL_BBQ(plane)].dropna() handles[len(PLANES)+idx] = ax[idx].plot(filtered_data.index, filtered_data.values, color=ps.change_color_brightness(color, .7), marker=".", label="filtered".format(plane.lower()) )[0] handles[2*len(PLANES)+idx] = ax[idx].plot(bbq_df.index, bbq_df[COL_MAV(plane)], color=color, linestyle="-", label="moving av.".format(plane.lower()) )[0] if ymin is None and two_plots: ax[idx].set_ylim(bottom=min(bbq_df.loc[mask, COL_BBQ(plane)])) if ymax is None and two_plots: ax[idx].set_ylim(top=max(bbq_df.loc[mask, COL_BBQ(plane)])) # things to add/do only once if there is only one plot for idx in range(1+two_plots): if interval: ax[idx].axvline(x=interval[0], color="red") ax[idx].axvline(x=interval[1], color="red") if two_plots: ax[idx].set_ylabel("$Q_{:s}$".format(PLANES[idx])) else: ax[idx].set_ylabel('Tune') ax[idx].set_ylim(bottom=ymin, top=ymax) ax[idx].yaxis.set_major_formatter(FormatStrFormatter('%.5f')) ax[idx].set_xlim(left=xmin, right=xmax) ax[idx].set_xlabel('Time') ax[idx].xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S')) if idx: # don't show labels on upper plot (if two plots) # use the visibility to allow cursor x-position to be shown ax[idx].tick_params(labelbottom=False) ax[idx].xaxis.get_label().set_visible(False) if not two_plots or idx: # reorder legend ax[idx].legend(handles, [h.get_label() for h in handles], loc='lower right', bbox_to_anchor=(1.0, 1.01), ncol=3,) fig.tight_layout() fig.tight_layout() if output: fig.savefig(output) ps.set_name(os.path.basename(output)) if show: plt.draw() return fig
# Private methods ############################################################ def _get_interpolated_moving_average(data_series, clean_mask, length): """ Returns the moving average of data series with a window of length and interpolated NaNs""" data = data_series.copy() data[clean_mask] = np.NaN # 'interpolate' fills nan based on index/values of neighbours data = data.interpolate("index").fillna(method="bfill").fillna(method="ffill") shift = -int((length-1)/2) # Shift average to middle value # calculate mean and std, fill NaNs at the ends data_mav = data.rolling(length).mean().shift(shift).fillna( method="bfill").fillna(method="ffill") std_mav = data.rolling(length).std().shift(shift).fillna( method="bfill").fillna(method="ffill") return data_mav, std_mav def _is_almost_empty_mask(mask, av_length): """ Checks if masked data could be used to calculate moving average. """ if sum(mask) <= av_length: raise ValueError("Too many points have been filtered. Maybe wrong tune, cutoff?")