Source code for turn_by_turn.madng

"""
MAD-NG
------

This module provides functions to read and write turn-by-turn measurement data
produced by the ``MAD-NG`` code. MAD-NG stores its tracking data in the **TFS**
(Table File System) file format.

Data is loaded into the standardized ``TbtData`` structure used by ``turn_by_turn``,
allowing easy post-processing and conversion between formats.

Dependencies:
    - Requires the ``tfs-pandas >= 4.0.0`` package for compatibility with MAD-NG
      features. Earlier versions does not support MAD-NG TFS files.

"""

from __future__ import annotations

import logging
from datetime import datetime
from typing import TYPE_CHECKING

import pandas as pd

if TYPE_CHECKING:
    from pathlib import Path  # Only used for type hinting

    import tfs

from turn_by_turn.structures import TbtData, TransverseData

LOGGER = logging.getLogger(__name__)

# Define the column names in the TFS file
NAME = "name"
ELEMENT_INDEX = "eidx"
TURN = "turn"
PARTICLE_ID = "id"

# Define the header names in the TFS file
HNAME = "name"
ORIGIN = "origin"
DATE = "date"
TIME = "time"
REFCOL = "refcol"


[docs] def read_tbt(file_path: str | Path) -> TbtData: """ Read turn-by-turn data from a MAD-NG TFS file. Loads the TFS file using ``tfs-pandas`` and converts its contents into a ``TbtData`` object for use with the ``turn_by_turn`` toolkit. Args: file_path (str | Path): Path to the MAD-NG TFS measurement file. Returns: TbtData: The loaded turn-by-turn data. Raises: ImportError: If the ``tfs-pandas`` package is not installed. """ try: import tfs except ImportError as e: raise ImportError( "The 'tfs' package is required to read MAD-NG TFS files. " "Install it with: pip install 'turn_by_turn[madng]'" ) from e LOGGER.debug("Starting to read TBT data from dataframe") df = tfs.read(file_path) return convert_to_tbt(df)
[docs] def convert_to_tbt(df: pd.DataFrame | tfs.TfsDataFrame) -> TbtData: """ Convert a TFS or pandas DataFrame to a ``TbtData`` object. This function parses the required turn-by-turn columns, reconstructs the particle-by-particle tracking data, and returns a ``TbtData`` instance that can be written or converted to other formats. Args: df (pd.DataFrame | TfsDataFrame): DataFrame containing MAD-NG turn-by-turn tracking data. Returns: TbtData: The extracted and structured turn-by-turn data. Raises: TypeError: If the input is not a recognized DataFrame type. ValueError: If the data structure is inconsistent (e.g., lost particles). """ # Get the date and time from the headers (return None if not found) try: import tfs is_tfs_df = isinstance(df, tfs.TfsDataFrame) except ImportError: LOGGER.debug("The 'tfs' package is not installed. Assuming a pandas DataFrame.") is_tfs_df = False if is_tfs_df: date_str = df.headers.get(DATE) time_str = df.headers.get(TIME) else: date_str = df.attrs.get(DATE) time_str = df.attrs.get(TIME) # Combine the date and time into a datetime object date = None if date_str and time_str: date = datetime.strptime(f"{date_str} {time_str}", "%d/%m/%y %H:%M:%S") elif date_str: date = datetime.strptime(date_str, "%d/%m/%y") nturns = int(df.iloc[-1].loc[TURN]) npart = int(df.iloc[-1].loc[PARTICLE_ID]) LOGGER.info(f"Number of turns: {nturns}, Number of particles: {npart}") # Get the names of the observed points (BPMs) from the first particle's first turn df = df.set_index([PARTICLE_ID, TURN]).sort_index() observe_points = df.loc[(1, 1)][NAME].to_numpy() num_observables = len(observe_points) # Check if the number of observed points is consistent for all particles/turns if len(df[NAME]) / nturns / npart != num_observables: raise ValueError( "The number of observed points is not consistent for all particles/turns. " "Simulation may have lost particles." ) matrices = [] # Particle IDs start from 1, but we use 0-based indexing in Python particle_ids = range(npart) for particle_id in particle_ids: LOGGER.info(f"Processing particle ID: {particle_id}") # Filter the dataframe for the current particle df_particle = df.loc[particle_id + 1] # Create a dictionary of the TransverseData fields tracking_data_dict = { plane: pd.DataFrame( index=observe_points, data=df_particle[plane.lower()] # MAD-NG uses lower case for the planes .to_numpy() .reshape(num_observables, nturns, order="F"), # ^ Number of Observables x Number of turns, Fortran order (So that the observables are the rows) ) for plane in TransverseData.fieldnames() # X, Y } # Append the TransverseData object to the matrices list # We don't use TrackingData, as MAD-NG does not provide energy matrices.append(TransverseData(**tracking_data_dict)) LOGGER.debug("Finished reading TBT data") return TbtData(matrices=matrices, bunch_ids=list(particle_ids), nturns=nturns, date=date)
[docs] def write_tbt(output_path: str | Path, tbt_data: TbtData) -> None: """ Write turn-by-turn data to a MAD-NG TFS file. Takes a ``TbtData`` object and writes its contents to disk in the standard TFS format used by MAD-NG, including relevant headers (date, time, origin). Args: output_path (str | Path): Destination file path for the TFS file. tbt_data (TbtData): The turn-by-turn data to write. Raises: ImportError: If the ``tfs-pandas`` package is not installed. """ try: import tfs except ImportError as e: raise ImportError( "The 'tfs' package is required to write MAD-NG TFS files. Install it with: pip install 'turn_by_turn[madng]'" ) from e planes = [plane.lower() for plane in TransverseData.fieldnames()] # x, y plane_dfs = {plane: [] for plane in planes} for particle_id, transverse_data in zip(tbt_data.bunch_ids, tbt_data.matrices): for plane in planes: # Create a dataframe for the current plane and particle particle_df: pd.DataFrame = transverse_data[plane.upper()].copy() # Create the name column from the index particle_df.index.name = NAME particle_df = particle_df.reset_index() # Add the element index column (to be used for merging) particle_df[ELEMENT_INDEX] = particle_df.index # Melt the dataframe to have columns: name, element index, turn, plane particle_df = pd.melt( particle_df, id_vars=[NAME, ELEMENT_INDEX], var_name=TURN, value_name=plane, ) # Add the particle ID column particle_df[PARTICLE_ID] = particle_id + 1 # Convert the turn column to integer and increment by 1 (MAD-NG uses 1-based indexing) particle_df[TURN] = particle_df[TURN].astype(int) + 1 # Append the dataframe to the list plane_dfs[plane].append(particle_df) # Merge the dataframes on name, turn, particle ID and element index for both planes df_x = pd.concat(plane_dfs[planes[0]]) df_y = pd.concat(plane_dfs[planes[1]]) merged_df = pd.merge(df_x, df_y, on=[NAME, TURN, PARTICLE_ID, ELEMENT_INDEX]) merged_df = merged_df.set_index([NAME]) # Sort the dataframe by turn, element index and particle ID (so the format is consistent with MAD-NG) merged_df = merged_df.sort_values(by=[TURN, ELEMENT_INDEX, PARTICLE_ID]) # Drop the element index column (this is not the real element index, but a temporary one for merging) merged_df = merged_df.drop(columns=[ELEMENT_INDEX]) # Set the columns to x, y, turn, id, for consistency. merged_df = merged_df[[planes[0], planes[1], TURN, PARTICLE_ID]] # Write the dataframe to a TFS file headers = { HNAME: "TbtData", ORIGIN: "Python", DATE: tbt_data.date.strftime("%d/%m/%y"), TIME: tbt_data.date.strftime("%H:%M:%S"), REFCOL: NAME, } tfs.write(output_path, merged_df, headers_dict=headers, save_index=NAME)