"""
SixDesk Submission Utils
---------------------------
Individual functions to call SixDesk functionality.
"""
import logging
from pathlib import Path
from typing import Union
from pylhc_submitter.constants.autosix import (
MAD_TO_SIXTRACK_SH,
RUNSIX_SH,
RUNSTATUS_SH,
SIXDB,
get_sixjobs_path, StageSkip,
)
from pylhc_submitter.constants.external_paths import SIXDESK_UTILS
from pylhc_submitter.sixdesk_tools.utils import start_subprocess
LOG = logging.getLogger(__name__)
[docs]
def submit_mask(jobname: str, basedir: Path, sixdesk: Path = SIXDESK_UTILS, ssh: str = None):
""" Run the mask (probably Madx) and generate sixtrack input files.
Args:
jobname (str): Name of the Job
basedir (Path): SixDesk Basefolder Location
sixdesk (Path): Path to the SixDesk installation/repo
"""
LOG.info("Submitting mask to run for sixtrack input generation.")
sixjobs_path = get_sixjobs_path(jobname, basedir)
start_subprocess([sixdesk / MAD_TO_SIXTRACK_SH, "-s"], cwd=sixjobs_path, ssh=ssh)
LOG.info("Submitted mask-jobs.")
[docs]
def submit_sixtrack(jobname: str, basedir: Path, python: Path = None,
sixdesk: Path = SIXDESK_UTILS, ssh: str = None, resubmit: bool = False):
""" Generate simulation files and check if runnable and submit. """
re_str = "Re-" if resubmit else ""
LOG.info(f"{re_str}Submitting to sixtrack.")
sixjobs_path = get_sixjobs_path(jobname, basedir)
args = ["-i"] if resubmit else ["-a"]
if python is not None:
if not python.is_dir():
python = python.parent
args += ["-P", str(python)]
try:
start_subprocess([sixdesk / RUNSIX_SH] + args, cwd=sixjobs_path, ssh=ssh, check_log='exit status: 1')
except OSError as e:
raise StageSkip(
f"{re_str}Submit to sixtrack for {jobname} ended in error."
f" Input generation possibly not finished. Check your Scheduler."
) from e
else:
LOG.info(f"{re_str}Submitted jobs to Sixtrack")
[docs]
def check_sixtrack_output(jobname: str, basedir: Path, python: Union[Path, str],
sixdesk: Path = SIXDESK_UTILS, ssh: str = None, resubmit: bool = False):
""" Checks if the sixtrack output is all there. """
LOG.info("Checking if sixtrack has finished.")
sixjobs_path = get_sixjobs_path(jobname, basedir)
try:
start_subprocess([sixdesk / RUNSTATUS_SH], cwd=sixjobs_path, ssh=ssh)
except OSError as e:
if resubmit:
submit_sixtrack(jobname, basedir, python=python, ssh=ssh, resubmit=True)
raise StageSkip(
f"Sixtrack for {jobname} seems to be incomplete."
" Resubmitted incomplete sixtrack jobs."
" Wait until they have finished and run again."
)
else:
raise StageSkip(
f"Sixtrack for {jobname} seems to be incomplete."
f" Run possibly not finished. Check (debug-) log or your Scheduler."
) from e
else:
LOG.info("Sixtrack results are all present.")
[docs]
def sixdb_load(jobname: str, basedir: Path, python: Union[Path, str],
sixdesk: Path = SIXDESK_UTILS, ssh: str = None):
""" Creates sixdb database and loads the study results into it. """
LOG.info("Loading study into database.")
sixjobs_path = get_sixjobs_path(jobname, basedir)
try:
start_subprocess([python, sixdesk / SIXDB, ".", "load_dir"], cwd=sixjobs_path, ssh=ssh)
except OSError as e:
raise StageSkip(f"Sixdb loading for {jobname} failed. Check (debug-) log.") from e
else:
LOG.info("Created database for study.")
[docs]
def sixdb_cmd(jobname: str, basedir: Path, python: Union[Path, str], cmd: list,
sixdesk: Path = SIXDESK_UTILS, ssh: str = None):
""" Performs analysis on the sixdb database. """
cmd_str = " ".join(cmd)
LOG.info(f"Performing sixdb command `{cmd_str}`.")
sixjobs_path = get_sixjobs_path(jobname, basedir)
try:
start_subprocess([python, sixdesk / SIXDB, jobname] + cmd, cwd=sixjobs_path, ssh=ssh)
except OSError as e:
raise StageSkip(f"SixBD command {cmd_str} for {jobname} failed. Check (debug-) log.") from e
else:
LOG.info(f"SixDB command '{cmd_str}' successfully run.")