Source code for pylhc_submitter.sixdesk_tools.submit

"""
SixDesk Submission Utils
---------------------------

Individual functions to call SixDesk functionality.
"""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING

from pylhc_submitter.constants.autosix import (
    MAD_TO_SIXTRACK_SH,
    RUNSIX_SH,
    RUNSTATUS_SH,
    SIXDB,
    StageSkipError,
    get_sixjobs_path,
)
from pylhc_submitter.constants.external_paths import SIXDESK_UTILS
from pylhc_submitter.sixdesk_tools.utils import start_subprocess

if TYPE_CHECKING:
    from pathlib import Path

LOG = logging.getLogger(__name__)


[docs] def submit_mask(jobname: str, basedir: Path, sixdesk: Path = SIXDESK_UTILS, ssh: str = None): """Run the mask (probably Madx) and generate sixtrack input files. Args: jobname (str): Name of the Job basedir (Path): SixDesk Basefolder Location sixdesk (Path): Path to the SixDesk installation/repo """ LOG.info("Submitting mask to run for sixtrack input generation.") sixjobs_path = get_sixjobs_path(jobname, basedir) start_subprocess([sixdesk / MAD_TO_SIXTRACK_SH, "-s"], cwd=sixjobs_path, ssh=ssh) LOG.info("Submitted mask-jobs.")
[docs] def check_sixtrack_input( jobname: str, basedir: Path, sixdesk: Path = SIXDESK_UTILS, ssh: str = None, resubmit: bool = False, ): """Checks the generated input files needed by sixtrack and resubmits, if requested.""" LOG.info("Checking if input files are present.") sixjobs_path = get_sixjobs_path(jobname, basedir) try: start_subprocess([sixdesk / MAD_TO_SIXTRACK_SH, "-c"], cwd=sixjobs_path, ssh=ssh) except OSError as e: if resubmit: LOG.info("Resubmitting mask to run wrong seeds for sixtrack input generation.") start_subprocess([sixdesk / MAD_TO_SIXTRACK_SH, "-w"], cwd=sixjobs_path, ssh=ssh) raise StageSkipError( "Resubmitted input generation jobs " "(Not really an error, but the run is now interrupted)." ) raise StageSkipError( "Checking input files failed. Check (debug-) logs. Maybe restart with 'resubmit' flag." ) from e else: LOG.info("Check for input files was successful.")
[docs] def submit_sixtrack( jobname: str, basedir: Path, python: Path = None, sixdesk: Path = SIXDESK_UTILS, ssh: str = None, resubmit: bool = False, ): """Generate simulation files and check if runnable and submit.""" re_str = "Re-" if resubmit else "" LOG.info(f"{re_str}Submitting to sixtrack.") sixjobs_path = get_sixjobs_path(jobname, basedir) args = ["-i"] if resubmit else ["-a"] if python is not None: if not python.is_dir(): python = python.parent args += ["-P", str(python)] try: start_subprocess( [sixdesk / RUNSIX_SH] + args, cwd=sixjobs_path, ssh=ssh, check_log="exit status: 1" ) except OSError as e: raise StageSkipError( f"{re_str}Submit to sixtrack for {jobname} ended in error." f" Input generation possibly not finished. Check your Scheduler." ) from e else: LOG.info(f"{re_str}Submitted jobs to Sixtrack")
[docs] def check_sixtrack_output( jobname: str, basedir: Path, python: Path | str, sixdesk: Path = SIXDESK_UTILS, ssh: str = None, resubmit: bool = False, ): """Checks if the sixtrack output is all there.""" LOG.info("Checking if sixtrack has finished.") sixjobs_path = get_sixjobs_path(jobname, basedir) try: start_subprocess([sixdesk / RUNSTATUS_SH], cwd=sixjobs_path, ssh=ssh) except OSError as e: if resubmit: submit_sixtrack(jobname, basedir, python=python, ssh=ssh, resubmit=True) raise StageSkipError( f"Sixtrack for {jobname} seems to be incomplete." " Resubmitted incomplete sixtrack jobs." " Wait until they have finished and run again." ) raise StageSkipError( f"Sixtrack for {jobname} seems to be incomplete." f" Run possibly not finished. Check (debug-) log or your Scheduler." ) from e else: LOG.info("Sixtrack results are all present.")
[docs] def sixdb_load( jobname: str, basedir: Path, python: Path | str, sixdesk: Path = SIXDESK_UTILS, ssh: str = None, ): """Creates sixdb database and loads the study results into it.""" LOG.info("Loading study into database.") sixjobs_path = get_sixjobs_path(jobname, basedir) try: start_subprocess([python, sixdesk / SIXDB, ".", "load_dir"], cwd=sixjobs_path, ssh=ssh) except OSError as e: raise StageSkipError(f"Sixdb loading for {jobname} failed. Check (debug-) log.") from e else: LOG.info("Created database for study.")
[docs] def sixdb_cmd( jobname: str, basedir: Path, python: Path | str, cmd: list, sixdesk: Path = SIXDESK_UTILS, ssh: str = None, ): """Performs analysis on the sixdb database.""" cmd_str = " ".join(cmd) LOG.info(f"Performing sixdb command `{cmd_str}`.") sixjobs_path = get_sixjobs_path(jobname, basedir) try: start_subprocess([python, sixdesk / SIXDB, jobname] + cmd, cwd=sixjobs_path, ssh=ssh) except OSError as e: raise StageSkipError( f"SixBD command {cmd_str} for {jobname} failed. Check (debug-) log." ) from e else: LOG.info(f"SixDB command '{cmd_str}' successfully run.")