"""
Frame
-----
Contains the class definition of a ``TfsDataFrame``, inherited from the ``pandas`` ``DataFrame``, as well
as a utility function to validate the correctness of a ``TfsDataFrame``.
"""
from __future__ import annotations
import logging
from contextlib import suppress
from functools import partial, reduce
from typing import TYPE_CHECKING, ClassVar
import numpy as np
import pandas as pd
from pandas.api import types as pdtypes
from tfs.constants import VALIDATION_MODES
from tfs.errors import (
DuplicateColumnsError,
DuplicateIndicesError,
IterableInDataFrameError,
MADXCompatibilityError,
NonStringColumnNameError,
SpaceinColumnNameError,
)
if TYPE_CHECKING:
from collections.abc import Sequence
LOGGER = logging.getLogger(__name__)
[docs]
class TfsDataFrame(pd.DataFrame):
"""
Class to hold the information of the built an extended `pandas.DataFrame`, together with a way
of getting the headers of the **TFS** file. The file headers are stored in a dictionary upon read.
To get a header value use ``data_frame.headers["header_name"]``, or ``data_frame["header_name"]`` if
it does not conflict with a column name in the dataframe.
"""
_metadata: ClassVar = ["headers"]
def __init__(self, *args, **kwargs):
self.headers = {}
with suppress(IndexError, AttributeError):
self.headers = args[0].headers
self.headers = kwargs.pop("headers", self.headers)
super().__init__(*args, **kwargs)
def __getitem__(self, key: object) -> object:
try:
return super().__getitem__(key)
except KeyError as error:
try:
return self.headers[key]
except KeyError as kerror:
errmsg = f"{key} is neither in the DataFrame nor in headers."
raise KeyError(errmsg) from kerror
except TypeError as terror:
raise error from terror
def __getattr__(self, name: str) -> object:
try:
return super().__getattr__(name)
except AttributeError:
try:
return self.headers[name]
except KeyError as error:
errmsg = f"{name} is neither in the DataFrame nor in headers."
raise AttributeError(errmsg) from error
@property
def _constructor(self):
"""Function called, whenever a new ``TfsDataFrame`` is created
by pandas functionality, to ensure the new object is also a ``TfsDataFrame``.
"""
return TfsDataFrame
def _constructor_from_mgr(self, mgr, axes):
"""Initialize new ``TfsDataFrame`` from a dataframe manager.
This function is needed since pandas v2.1.0 to ensure the new object
given to __init__() already contains the headers.
See https://github.com/pandas-dev/pandas/issues/55120"""
obj = self._from_mgr(mgr, axes)
obj.headers = {}
return obj
def _headers_repr(self) -> str:
space: str = " " * 4
def _str_items(items_list: Sequence[str]) -> str:
return "\n".join(f"{space}{k}: {v}" for k, v in items_list)
s: str = ""
if len(self.headers):
s += "Headers:\n"
if len(self.headers) > 7: # noqa: PLR2004
items = list(self.headers.items())
s += f"{_str_items(items[:3])}\n{space}...\n{_str_items(items[-3:])}\n"
else:
s += f"{_str_items(self.headers.items())}\n"
s += "\n"
return s
def __repr__(self) -> str:
headers_string = self._headers_repr()
return f"{headers_string}{super().__repr__()}"
[docs]
def merge(
self,
right: TfsDataFrame | pd.DataFrame,
how_headers: str | None = None,
new_headers: dict | None = None,
**kwargs,
) -> TfsDataFrame:
"""
Merge ``TfsDataFrame`` objects with a database-style join. Data manipulation is done by the
``pandas.Dataframe`` method of the same name. Resulting headers are either merged according to the
provided **how_headers** method or as given via **new_headers**.
Args:
right (TfsDataFrame | pd.DataFrame): The ``TfsDataFrame`` to merge with the caller.
how_headers (str): Type of merge to be performed for the headers. Either **left** or **right**.
Refer to `tfs.frame.merge_headers` for behavior. If ``None`` is provided and
**new_headers** is not provided, the final headers will be empty. Case insensitive,
defaults to ``None``.
new_headers (dict): If provided, will be used as headers for the merged ``TfsDataFrame``.
Otherwise these are determined by merging the headers from the caller and the other
``TfsDataFrame`` according to the method defined by the **how_headers** argument.
**kwargs: Arguments for `pandas.DataFrame.merge`, with the same default values as set in
the ``pandas`` codebase.
Returns:
A new ``TfsDataFrame`` with the merged data and merged headers.
"""
LOGGER.debug("Merging data through 'pandas'")
if not hasattr(right, "headers"):
LOGGER.debug("Converting 'right' to TfsDataFrame for merging")
right = TfsDataFrame(right) # so we accept pandas.DataFrame input here
dframe = super().merge(right, **kwargs)
LOGGER.debug("Determining headers")
# fmt: off
new_headers = (
new_headers
if new_headers is not None
else merge_headers(self.headers, right.headers, how=how_headers)
)
return TfsDataFrame(data=dframe, headers=new_headers)
# fmt: on
[docs]
def concat(
objs: Sequence[TfsDataFrame | pd.DataFrame],
how_headers: str | None = None,
new_headers: dict | None = None,
**kwargs,
) -> TfsDataFrame:
"""
Concatenate ``TfsDataFrame`` objects along a particular axis with optional set logic along the other
axes. Data manipulation is done by the `pandas.concat` function. Resulting headers are either
merged according to the provided **how_headers** method or as given via **new_headers**.
.. warning::
Please note that when using this function on many ``TfsDataFrames``, leaving the contents of the
final headers dictionary to the automatic merger can become unpredictable. In this case it is
recommended to provide the **new_headers** argument to ensure the final result, or leave both
**how_headers** and **new_headers** as ``None`` (their defaults) to end up with empty headers.
Args:
objs (Sequence[TfsDataFrame | pd.DataFrame]): the ``TfsDataFrame`` objects to be concatenated.
how_headers (str): Type of merge to be performed for the headers. Either **left** or **right**.
Refer to `tfs.frame.merge_headers` for behavior. If ``None`` is provided and **new_headers**
is not provided, the final headers will be empty. Case insensitive, defaults to ``None``.
new_headers (dict): If provided, will be used as headers for the merged ``TfsDataFrame``.
Otherwise these are determined by successively merging the headers from all concatenated
``TfsDataFrames`` according to the method defined by the **how_headers** argument.
**kwargs: Any keyword argument is given to `pandas.concat`.
Returns:
A new ``TfsDataFrame`` with the merged data and merged headers.
"""
LOGGER.debug("Concatenating data through 'pandas'")
objs = [dframe if hasattr(dframe, "headers") else TfsDataFrame(dframe) for dframe in objs]
dframe = pd.concat(objs, **kwargs)
LOGGER.debug("Determining headers")
merger = partial(merge_headers, how=how_headers) # so we can reduce on all objs, and use 'how_headers'
all_headers = (tfsdframe.headers for tfsdframe in objs)
new_headers = new_headers if new_headers is not None else reduce(merger, all_headers)
return TfsDataFrame(data=dframe, headers=new_headers)
[docs]
def validate(
data_frame: TfsDataFrame | pd.DataFrame,
info_str: str = "",
non_unique_behavior: str = "warn",
compatibility: str = "madx",
) -> None:
"""
Enforce validity rules on a `TfsDataFrame` (see admonition below).
Additional checks are performed for compatibility with either ``MAD-X``
or ``MAD-NG`` as provided by the `compatibility` parameter.
.. admonition:: Methodology
This function performs several different checks on the provided dataframe. The following
checks are performed for all compatibility modes (``MAD-X`` and ``MAD-NG``):
1. Checking no single element in the data is a `list` or `tuple`.
2. Checking for non-physical values in the dataframe (uses `.isna()` with the right option context).
3. Checking for duplicates in either indices or columns.
4. Checking for column names that are not strings.
5. Checking for column names including spaces.
When checking for ``MAD-X`` compatibility, which is more restrictive than ``MAD-NG``,
the following additional checks are performed:
1. Checking the dataframe has headers.
2. Checking no boolean values are in the dataframe headers.
3. Checking no complex values are in the dataframe headers.
4. Checking for a 'TYPE' entry is in the dataframe headers.
5. Checking no boolean-dtype columns are in the dataframe.
6. Checking no complex-dtype columns are in the dataframe.
Args:
data_frame (TfsDataFrame | pd.DataFrame): the dataframe to check on.
info_str (str): additional information to include in logging statements.
compatibility (str): Which code to check for compatibility with. Accepted values
are `madx`, `mad-x`, `madng` and `mad-ng`, case-insensitive. Defauts to `madx`.
non_unique_behavior (str): behavior to adopt if non-unique indices or columns are found in the
dataframe. Accepts `warn` and `raise` as values, case-insensitively, which dictates
to respectively issue a warning or raise an error if non-unique elements are found.
"""
if not isinstance(compatibility, str) or compatibility.lower() not in VALIDATION_MODES:
errmsg = f"Invalid compatibility mode provided: '{compatibility}'."
raise ValueError(errmsg)
if non_unique_behavior.lower() not in ("warn", "raise"):
errmsg = "Invalid value for parameter 'non_unique_behavior'."
raise ValueError(errmsg)
# ----- Check that no element is a list / tuple in the dataframe ----- #
def _element_is_list(element):
return isinstance(element, list | tuple)
_element_is_list = np.vectorize(_element_is_list)
list_or_tuple_bool_df = data_frame.apply(_element_is_list)
if list_or_tuple_bool_df.to_numpy().any():
LOGGER.error(
f"DataFrame {info_str} contains list/tuple values at Index: "
f"{list_or_tuple_bool_df.index[list_or_tuple_bool_df.any(axis='columns')].tolist()}"
)
raise IterableInDataFrameError
# ----- Check that no element is non-physical value in the data and headers ----- #
# The pd.option_context('mode.use_inf_as_na', True) context manager raises FutureWarning
# and will likely disappear in pandas 3.0 so we replace 'inf' values by NaNs before calling
# .isna(). Additionally, the downcasting behaviour of .replace() is deprecated and raises a
# FutureWarning, so we use .infer_objects() first to attemps soft conversion to a better dtype
# for object-dtype columns (which strings can be). Since .infer_objects() and .replace() return
# (lazy for the former) copies we're not modifying the original dataframe during validation :)
inf_or_nan_bool_df = data_frame.infer_objects().replace([np.inf, -np.inf], np.nan).isna()
if inf_or_nan_bool_df.to_numpy().any():
LOGGER.warning(
f"DataFrame {info_str} contains non-physical values at Index: "
f"{inf_or_nan_bool_df.index[inf_or_nan_bool_df.any(axis='columns')].tolist()}"
)
if getattr(data_frame, "headers", None) is not None and pd.Series(data_frame.headers.values()).isna().any():
LOGGER.warning(f"DataFrame {info_str} contains non-physical values in headers.")
# ----- Other sanity checks ----- #
# These are not deal-breakers but might raise
# issues being read back by some other codes
if data_frame.index.has_duplicates:
LOGGER.warning("Non-unique indices found.")
if non_unique_behavior.lower() == "raise":
raise DuplicateIndicesError
if data_frame.columns.has_duplicates:
LOGGER.warning("Non-unique column names found.")
if non_unique_behavior.lower() == "raise":
raise DuplicateColumnsError
# The following are deal-breakers for the TFS format,
# but might be accepted by MAD-X or MAD-NG
if any(not isinstance(c, str) for c in data_frame.columns):
LOGGER.debug(f"Some column-names are not of string-type, dataframe {info_str} is invalid.")
raise NonStringColumnNameError
if any(" " in c for c in data_frame.columns):
LOGGER.debug(f"Space(s) found in TFS columns, dataframe {info_str} is invalid")
raise SpaceinColumnNameError
# ----- Additional checks for MAD-X compatibility mode ----- #
if compatibility.lower() in ("madx", "mad-x"):
# MAD-X really wants a 'TYPE' header in the file, which is not possible to
# enforce if there are no headers in the df (we can't add, we don't modify
# and return the df in this function). In this case we error.
if getattr(data_frame, "headers", None) is None:
errmsg = "Headers should be present in MAD-X compatibility mode."
raise MADXCompatibilityError(errmsg)
# ----- Otherwise we can check the existing headers
# Check that no boolean values are in the headers - MAD-X does not accept them
if any(isinstance(header, bool) for header in data_frame.headers.values()):
LOGGER.debug(
f"Boolean values found in headers of dataframe {info_str}, which is incompatible with MAD-X. "
"Change their types in order to keep compatibility with MAD-X."
)
errmsg = "TFS-Headers can not contain boolean values in MAD-X compatibility mode."
raise MADXCompatibilityError(errmsg)
# Check that no complex values are in the headers - MAD-X does not accept them
if any(isinstance(header, complex) for header in data_frame.headers.values()):
LOGGER.debug(
f"Complex values found in headers of dataframe {info_str}, which is incompatible with MAD-X. "
"Change their types in order to keep compatibility with MAD-X."
)
errmsg = "TFS-Headers can not contain complex values in MAD-X compatibility mode."
raise MADXCompatibilityError(errmsg)
# Check that no 'None' values are in the headers - it would
# write as 'nil' which MAD-X does not accept
if any(header is None for header in data_frame.headers.values()):
LOGGER.debug(
f"'None' values found in headers of dataframe {info_str}, which is incompatible with MAD-X. "
"Remove them or assign a value in order to keep compatibility with MAD-X."
)
errmsg = "TFS-Headers can not contain 'None' values in MAD-X compatibility mode."
raise MADXCompatibilityError(errmsg)
# MAD-X will not accept back in a TFS file with no 'TYPE' entry in the headers (as string)
if "TYPE" not in data_frame.headers:
LOGGER.warning("MAD-X expects a 'TYPE' header in the TFS file, which is missing. Adding it.")
data_frame.headers["TYPE"] = "Added by tfs-pandas for MAD-X compatibility"
# ----- The following checks are regarding the data itself ----- #
# Check that the dataframe contains no boolean dtype columns
if any(pdtypes.is_bool_dtype(type_) for type_ in data_frame.dtypes):
LOGGER.debug(
f"Boolean dtype column found in dataframe {info_str}, which is incompatible with MAD-X. "
"Change the column dtypes to keep compatibility with MAD-X."
)
errmsg = "TFS-Dataframe can not contain boolean dtype columns in MAD-X compatibility mode."
raise MADXCompatibilityError(errmsg)
# Check that the dataframe contains no complex dtype columns
if any(pdtypes.is_complex_dtype(type_) for type_ in data_frame.dtypes):
LOGGER.debug(
f"Complex dtype column found in dataframe {info_str}, which is incompatible with MAD-X. "
"Change the column dtypes or split it into real and imaginary values to keep compatibility with MAD-X."
)
errmsg = "TFS-Dataframe can not contain complex dtype columns in MAD-X compatibility mode."
raise MADXCompatibilityError(errmsg)
LOGGER.debug(f"DataFrame {info_str} validated")