"""
SixDesk Submission Utils
---------------------------
Individual functions to call SixDesk functionality.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from pylhc_submitter.constants.autosix import (
MAD_TO_SIXTRACK_SH,
RUNSIX_SH,
RUNSTATUS_SH,
SIXDB,
StageSkipError,
get_sixjobs_path,
)
from pylhc_submitter.constants.external_paths import SIXDESK_UTILS
from pylhc_submitter.sixdesk_tools.utils import start_subprocess
if TYPE_CHECKING:
from pathlib import Path
LOG = logging.getLogger(__name__)
[docs]
def submit_mask(jobname: str, basedir: Path, sixdesk: Path = SIXDESK_UTILS, ssh: str = None):
"""Run the mask (probably Madx) and generate sixtrack input files.
Args:
jobname (str): Name of the Job
basedir (Path): SixDesk Basefolder Location
sixdesk (Path): Path to the SixDesk installation/repo
"""
LOG.info("Submitting mask to run for sixtrack input generation.")
sixjobs_path = get_sixjobs_path(jobname, basedir)
start_subprocess([sixdesk / MAD_TO_SIXTRACK_SH, "-s"], cwd=sixjobs_path, ssh=ssh)
LOG.info("Submitted mask-jobs.")
[docs]
def submit_sixtrack(
jobname: str,
basedir: Path,
python: Path = None,
sixdesk: Path = SIXDESK_UTILS,
ssh: str = None,
resubmit: bool = False,
):
"""Generate simulation files and check if runnable and submit."""
re_str = "Re-" if resubmit else ""
LOG.info(f"{re_str}Submitting to sixtrack.")
sixjobs_path = get_sixjobs_path(jobname, basedir)
args = ["-i"] if resubmit else ["-a"]
if python is not None:
if not python.is_dir():
python = python.parent
args += ["-P", str(python)]
try:
start_subprocess(
[sixdesk / RUNSIX_SH] + args, cwd=sixjobs_path, ssh=ssh, check_log="exit status: 1"
)
except OSError as e:
raise StageSkipError(
f"{re_str}Submit to sixtrack for {jobname} ended in error."
f" Input generation possibly not finished. Check your Scheduler."
) from e
else:
LOG.info(f"{re_str}Submitted jobs to Sixtrack")
[docs]
def check_sixtrack_output(
jobname: str,
basedir: Path,
python: Path | str,
sixdesk: Path = SIXDESK_UTILS,
ssh: str = None,
resubmit: bool = False,
):
"""Checks if the sixtrack output is all there."""
LOG.info("Checking if sixtrack has finished.")
sixjobs_path = get_sixjobs_path(jobname, basedir)
try:
start_subprocess([sixdesk / RUNSTATUS_SH], cwd=sixjobs_path, ssh=ssh)
except OSError as e:
if resubmit:
submit_sixtrack(jobname, basedir, python=python, ssh=ssh, resubmit=True)
raise StageSkipError(
f"Sixtrack for {jobname} seems to be incomplete."
" Resubmitted incomplete sixtrack jobs."
" Wait until they have finished and run again."
)
raise StageSkipError(
f"Sixtrack for {jobname} seems to be incomplete."
f" Run possibly not finished. Check (debug-) log or your Scheduler."
) from e
else:
LOG.info("Sixtrack results are all present.")
[docs]
def sixdb_load(
jobname: str,
basedir: Path,
python: Path | str,
sixdesk: Path = SIXDESK_UTILS,
ssh: str = None,
):
"""Creates sixdb database and loads the study results into it."""
LOG.info("Loading study into database.")
sixjobs_path = get_sixjobs_path(jobname, basedir)
try:
start_subprocess([python, sixdesk / SIXDB, ".", "load_dir"], cwd=sixjobs_path, ssh=ssh)
except OSError as e:
raise StageSkipError(f"Sixdb loading for {jobname} failed. Check (debug-) log.") from e
else:
LOG.info("Created database for study.")
[docs]
def sixdb_cmd(
jobname: str,
basedir: Path,
python: Path | str,
cmd: list,
sixdesk: Path = SIXDESK_UTILS,
ssh: str = None,
):
"""Performs analysis on the sixdb database."""
cmd_str = " ".join(cmd)
LOG.info(f"Performing sixdb command `{cmd_str}`.")
sixjobs_path = get_sixjobs_path(jobname, basedir)
try:
start_subprocess([python, sixdesk / SIXDB, jobname] + cmd, cwd=sixjobs_path, ssh=ssh)
except OSError as e:
raise StageSkipError(
f"SixBD command {cmd_str} for {jobname} failed. Check (debug-) log."
) from e
else:
LOG.info(f"SixDB command '{cmd_str}' successfully run.")