Source code for pylhc_submitter.sixdesk_tools.create_workspace

"""
Create SixDesk Workspace
-----------------------------------

Tools to setup the workspace for sixdesk.
"""

from __future__ import annotations

import contextlib
import logging
import re
import shutil
from dataclasses import asdict
from pathlib import Path

import numpy as np

from pylhc_submitter.constants.autosix import (
    SEED_KEYS,
    SETENV_SH,
    SIXENV_OPTIONAL,
    SIXENV_REQUIRED,
    SixDeskEnvironment,
    get_autosix_results_path,
    get_mad6t1_mask_path,
    get_mad6t_mask_path,
    get_masks_path,
    get_scratch_path,
    get_sixdeskenv_path,
    get_sixjobs_path,
    get_sysenv_path,
    get_workspace_path,
)
from pylhc_submitter.constants.external_paths import SIXDESK_UTILS
from pylhc_submitter.sixdesk_tools.utils import start_subprocess

SYSENV_MASK = Path(__file__).parent / "mask_sysenv"
SIXDESKENV_MASK = Path(__file__).parent / "mask_sixdeskenv"

LOG = logging.getLogger(__name__)


# Main -------------------------------------------------------------------------


[docs] def create_job( jobname: str, basedir: Path, executable: Path | str, mask_text: str, sixdesk: Path = SIXDESK_UTILS, ssh: str = None, **kwargs, ): """Create environment and individual jobs/masks for SixDesk to send to HTC. Keyword Args: Need to contain all replacements for sixdeskenv and the mask. """ _create_workspace(jobname, basedir, sixdesk=sixdesk, ssh=ssh) _create_sysenv(jobname, basedir, binary_path=executable) _create_sixdeskenv(jobname, basedir, **kwargs) _write_mask(jobname, basedir, mask_text, **kwargs) LOG.info("Workspace prepared.")
[docs] def init_workspace(jobname: str, basedir: Path, sixdesk: Path = SIXDESK_UTILS, ssh: str = None): """Initializes the workspace with sixdeskenv and sysenv.""" sixjobs_path = get_sixjobs_path(jobname, basedir) start_subprocess([sixdesk / SETENV_SH, "-s"], cwd=sixjobs_path, ssh=ssh) LOG.info("Workspace initialized.")
[docs] def remove_twiss_fail_check(jobname: str, basedir: Path): """Comments out the "Twiss fail" check from mad6t.sh""" LOG.info("Applying twiss-fail hack.") for mad6t_path in ( get_mad6t_mask_path(jobname, basedir), get_mad6t1_mask_path(jobname, basedir), ): lines = mad6t_path.read_text().splitlines(keepends=True) check_started = False for idx, line in enumerate(lines): if line.startswith('grep -i "TWISS fail"'): check_started = True if check_started: lines[idx] = f"# {line}" if line.startswith("fi"): break else: LOG.info(f"'TWISS fail' not found in {mad6t_path.name}") continue Path(mad6t_path).write_text("".join(lines))
[docs] def fix_pythonfile_call(jobname: str, basedir: Path): """Removes '<' in the `binary file` line in mad6t.sh so __file__ works.""" LOG.info("Applying python-file call fix.") for mad6t_path in ( get_mad6t_mask_path(jobname, basedir), get_mad6t1_mask_path(jobname, basedir), ): lines = Path(mad6t_path).read_text().splitlines(keepends=True) for idx, line in enumerate(lines): if line.startswith("$MADX_PATH/$MADX"): lines[idx] = '$MADX_PATH/$MADX $junktmp/$filejob."$i" > $filejob.out."$i"\n' break else: raise OSError(f"'$MADX_PATH/$MADX' line not found in {mad6t_path.name}") Path(mad6t_path).write_text("".join(lines))
[docs] def set_max_materialize(sixdesk: Path, max_materialize: int = None): """Adds the ``max_materialize`` option into the htcondor sixtrack submission-file.""" if max_materialize is None: return LOG.info(f"Setting max_materialize for SixTrack to {max_materialize}.") sub_path = sixdesk / "utilities" / "templates" / "htcondor" / "htcondor_run_six.sub" sub_content = sub_path.read_text() # Remove whole max_materialize line if present if max_materialize == 0: if "max_materialize" in sub_content: LOG.info("'max_materialize' already set. Removing.") sub_content = re.sub(r"max_materialize\s*=\s*\d+\s*", "", sub_content) else: LOG.debug("'max_materialize' is already not present (as desired).") # Set/replace with number else: max_materialize_str = f"max_materialize = {max_materialize:d}" if "max_materialize" in sub_content: LOG.info("max_materialize already set. Replacing it with new number.") sub_content = re.sub(r"max_materialize\s*=\s*\d+", max_materialize_str, sub_content) else: sub_content = sub_content.replace("\nqueue", f"\n{max_materialize_str}\nqueue") # Write out try: sub_path.write_text(sub_content) except OSError as e: raise OSError( f"Could not write to {sub_path!s}. `max_materialization` could not be set.\n" f"Remove option or use a SixDesk with writing rights." ) from e
# Helper ----------------------------------------------------------------------- def _create_workspace(jobname: str, basedir: Path, sixdesk: Path = SIXDESK_UTILS, ssh: str = None): """Create workspace structure (with default files).""" workspace_path = get_workspace_path(jobname, basedir) scratch_path = get_scratch_path(basedir) LOG.info(f'Creating new workspace in "{str(workspace_path)}"') if workspace_path.exists(): LOG.warning(f'Workspace in "{str(workspace_path)}" already exists. ') LOG.info("Do you want to delete the old workspace? [y/N]") user_answer = input() if user_answer.lower().startswith("y"): shutil.rmtree(workspace_path) with contextlib.suppress(FileNotFoundError): shutil.rmtree(scratch_path) else: LOG.warning("Keeping Workspace as-is.") return scratch_path.mkdir(parents=True, exist_ok=True) # create environment with all necessary files # _start_subprocess(['git', 'clone', GIT_REPO, basedir]) start_subprocess([sixdesk / SETENV_SH, "-N", workspace_path.name], cwd=basedir, ssh=ssh) # create autosix results folder. # Needs to be done after above command (as it crashes if folder exists) # but before end of this stage (as it needs to write the stagefile) get_autosix_results_path(jobname, basedir).mkdir(exist_ok=True, parents=True) def _create_sixdeskenv(jobname: str, basedir: Path, **kwargs): """Fills sixdeskenv mask and copies it to workspace""" workspace_path = get_workspace_path(jobname, basedir) scratch_path = get_scratch_path(basedir) sixdeskenv_path = get_sixdeskenv_path(jobname, basedir) missing = [key for key in SIXENV_REQUIRED if key not in kwargs] if len(missing): raise ValueError(f"The following keys are required but missing {missing}.") sixenv_variables = SixDeskEnvironment( JOBNAME=jobname, WORKSPACE=workspace_path.name, BASEDIR=str(basedir), SCRATCHDIR=str(scratch_path), TURNSPOWER=np.log10(kwargs["TURNS"]), **{k: v for k, v in kwargs.items() if k in SIXENV_REQUIRED + SIXENV_OPTIONAL}, ) sixenv_text = SIXDESKENV_MASK.read_text() sixdeskenv_path.write_text(sixenv_text % asdict(sixenv_variables)) LOG.debug("sixdeskenv written.") def _create_sysenv(jobname: str, basedir: Path, binary_path: Path): """Fills sysenv mask and copies it to workspace""" LOG.info(f"Chosen binary for mask '{str(binary_path)}'") sysenv_path = get_sysenv_path(jobname, basedir) sysenv_replace = { "MADXPATH": str(binary_path.parent), "MADXBIN": binary_path.name, } sysenv_text = SYSENV_MASK.read_text() sysenv_path.write_text(sysenv_text % sysenv_replace) LOG.debug("sysenv written.") def _write_mask(jobname: str, basedir: Path, mask_text: str, **kwargs): """Fills mask with arguments and writes it out.""" masks_path = get_masks_path(jobname, basedir) seed_range = [kwargs.get(key, getattr(SixDeskEnvironment, key)) for key in SEED_KEYS] if seed_range.count(None) == 1: raise ValueError( "First- or Lastseed is set, but the other one is deactivated. Set or unset both." ) if ("%SEEDRAN" not in mask_text) and ("%SEEDRAN" not in kwargs.values()) and any(seed_range): raise ValueError( "First- and Lastseed are set, but no seed-variable '%SEEDRAN' found in mask." ) mask_text = mask_text.replace("%SEEDRAN", "#!#SEEDRAN") # otherwise next line will complain mask_filled = mask_text % kwargs mask_filled = mask_filled.replace( "#!#SEEDRAN", "%SEEDRAN" ) # bring seedran back for sixdesk seed-loop mask_outfile = masks_path / f"{jobname}.mask" mask_outfile.write_text(mask_filled)