"""
Create SixDesk Workspace
-----------------------------------
Tools to setup the workspace for sixdesk.
"""
from __future__ import annotations
import contextlib
import logging
import re
import shutil
from dataclasses import asdict
from pathlib import Path
import numpy as np
from pylhc_submitter.constants.autosix import (
SEED_KEYS,
SETENV_SH,
SIXENV_OPTIONAL,
SIXENV_REQUIRED,
SixDeskEnvironment,
get_autosix_results_path,
get_mad6t1_mask_path,
get_mad6t_mask_path,
get_masks_path,
get_scratch_path,
get_sixdeskenv_path,
get_sixjobs_path,
get_sysenv_path,
get_workspace_path,
)
from pylhc_submitter.constants.external_paths import SIXDESK_UTILS
from pylhc_submitter.sixdesk_tools.utils import start_subprocess
SYSENV_MASK = Path(__file__).parent / "mask_sysenv"
SIXDESKENV_MASK = Path(__file__).parent / "mask_sixdeskenv"
LOG = logging.getLogger(__name__)
# Main -------------------------------------------------------------------------
[docs]
def create_job(
jobname: str,
basedir: Path,
executable: Path | str,
mask_text: str,
sixdesk: Path = SIXDESK_UTILS,
ssh: str = None,
**kwargs,
):
"""Create environment and individual jobs/masks for SixDesk to send to HTC.
Keyword Args:
Need to contain all replacements for sixdeskenv and the mask.
"""
_create_workspace(jobname, basedir, sixdesk=sixdesk, ssh=ssh)
_create_sysenv(jobname, basedir, binary_path=executable)
_create_sixdeskenv(jobname, basedir, **kwargs)
_write_mask(jobname, basedir, mask_text, **kwargs)
LOG.info("Workspace prepared.")
[docs]
def init_workspace(jobname: str, basedir: Path, sixdesk: Path = SIXDESK_UTILS, ssh: str = None):
"""Initializes the workspace with sixdeskenv and sysenv."""
sixjobs_path = get_sixjobs_path(jobname, basedir)
start_subprocess([sixdesk / SETENV_SH, "-s"], cwd=sixjobs_path, ssh=ssh)
LOG.info("Workspace initialized.")
[docs]
def remove_twiss_fail_check(jobname: str, basedir: Path):
"""Comments out the "Twiss fail" check from mad6t.sh"""
LOG.info("Applying twiss-fail hack.")
for mad6t_path in (
get_mad6t_mask_path(jobname, basedir),
get_mad6t1_mask_path(jobname, basedir),
):
lines = mad6t_path.read_text().splitlines(keepends=True)
check_started = False
for idx, line in enumerate(lines):
if line.startswith('grep -i "TWISS fail"'):
check_started = True
if check_started:
lines[idx] = f"# {line}"
if line.startswith("fi"):
break
else:
LOG.info(f"'TWISS fail' not found in {mad6t_path.name}")
continue
Path(mad6t_path).write_text("".join(lines))
[docs]
def fix_pythonfile_call(jobname: str, basedir: Path):
"""Removes '<' in the `binary file` line in mad6t.sh so __file__ works."""
LOG.info("Applying python-file call fix.")
for mad6t_path in (
get_mad6t_mask_path(jobname, basedir),
get_mad6t1_mask_path(jobname, basedir),
):
lines = Path(mad6t_path).read_text().splitlines(keepends=True)
for idx, line in enumerate(lines):
if line.startswith("$MADX_PATH/$MADX"):
lines[idx] = '$MADX_PATH/$MADX $junktmp/$filejob."$i" > $filejob.out."$i"\n'
break
else:
raise OSError(f"'$MADX_PATH/$MADX' line not found in {mad6t_path.name}")
Path(mad6t_path).write_text("".join(lines))
[docs]
def set_max_materialize(sixdesk: Path, max_materialize: int = None):
"""Adds the ``max_materialize`` option into the htcondor sixtrack
submission-file."""
if max_materialize is None:
return
LOG.info(f"Setting max_materialize for SixTrack to {max_materialize}.")
sub_path = sixdesk / "utilities" / "templates" / "htcondor" / "htcondor_run_six.sub"
sub_content = sub_path.read_text()
# Remove whole max_materialize line if present
if max_materialize == 0:
if "max_materialize" in sub_content:
LOG.info("'max_materialize' already set. Removing.")
sub_content = re.sub(r"max_materialize\s*=\s*\d+\s*", "", sub_content)
else:
LOG.debug("'max_materialize' is already not present (as desired).")
# Set/replace with number
else:
max_materialize_str = f"max_materialize = {max_materialize:d}"
if "max_materialize" in sub_content:
LOG.info("max_materialize already set. Replacing it with new number.")
sub_content = re.sub(r"max_materialize\s*=\s*\d+", max_materialize_str, sub_content)
else:
sub_content = sub_content.replace("\nqueue", f"\n{max_materialize_str}\nqueue")
# Write out
try:
sub_path.write_text(sub_content)
except OSError as e:
raise OSError(
f"Could not write to {sub_path!s}. `max_materialization` could not be set.\n"
f"Remove option or use a SixDesk with writing rights."
) from e
# Helper -----------------------------------------------------------------------
def _create_workspace(jobname: str, basedir: Path, sixdesk: Path = SIXDESK_UTILS, ssh: str = None):
"""Create workspace structure (with default files)."""
workspace_path = get_workspace_path(jobname, basedir)
scratch_path = get_scratch_path(basedir)
LOG.info(f'Creating new workspace in "{str(workspace_path)}"')
if workspace_path.exists():
LOG.warning(f'Workspace in "{str(workspace_path)}" already exists. ')
LOG.info("Do you want to delete the old workspace? [y/N]")
user_answer = input()
if user_answer.lower().startswith("y"):
shutil.rmtree(workspace_path)
with contextlib.suppress(FileNotFoundError):
shutil.rmtree(scratch_path)
else:
LOG.warning("Keeping Workspace as-is.")
return
scratch_path.mkdir(parents=True, exist_ok=True)
# create environment with all necessary files
# _start_subprocess(['git', 'clone', GIT_REPO, basedir])
start_subprocess([sixdesk / SETENV_SH, "-N", workspace_path.name], cwd=basedir, ssh=ssh)
# create autosix results folder.
# Needs to be done after above command (as it crashes if folder exists)
# but before end of this stage (as it needs to write the stagefile)
get_autosix_results_path(jobname, basedir).mkdir(exist_ok=True, parents=True)
def _create_sixdeskenv(jobname: str, basedir: Path, **kwargs):
"""Fills sixdeskenv mask and copies it to workspace"""
workspace_path = get_workspace_path(jobname, basedir)
scratch_path = get_scratch_path(basedir)
sixdeskenv_path = get_sixdeskenv_path(jobname, basedir)
missing = [key for key in SIXENV_REQUIRED if key not in kwargs]
if len(missing):
raise ValueError(f"The following keys are required but missing {missing}.")
sixenv_variables = SixDeskEnvironment(
JOBNAME=jobname,
WORKSPACE=workspace_path.name,
BASEDIR=str(basedir),
SCRATCHDIR=str(scratch_path),
TURNSPOWER=np.log10(kwargs["TURNS"]),
**{k: v for k, v in kwargs.items() if k in SIXENV_REQUIRED + SIXENV_OPTIONAL},
)
sixenv_text = SIXDESKENV_MASK.read_text()
sixdeskenv_path.write_text(sixenv_text % asdict(sixenv_variables))
LOG.debug("sixdeskenv written.")
def _create_sysenv(jobname: str, basedir: Path, binary_path: Path):
"""Fills sysenv mask and copies it to workspace"""
LOG.info(f"Chosen binary for mask '{str(binary_path)}'")
sysenv_path = get_sysenv_path(jobname, basedir)
sysenv_replace = {
"MADXPATH": str(binary_path.parent),
"MADXBIN": binary_path.name,
}
sysenv_text = SYSENV_MASK.read_text()
sysenv_path.write_text(sysenv_text % sysenv_replace)
LOG.debug("sysenv written.")
def _write_mask(jobname: str, basedir: Path, mask_text: str, **kwargs):
"""Fills mask with arguments and writes it out."""
masks_path = get_masks_path(jobname, basedir)
seed_range = [kwargs.get(key, getattr(SixDeskEnvironment, key)) for key in SEED_KEYS]
if seed_range.count(None) == 1:
raise ValueError(
"First- or Lastseed is set, but the other one is deactivated. Set or unset both."
)
if ("%SEEDRAN" not in mask_text) and ("%SEEDRAN" not in kwargs.values()) and any(seed_range):
raise ValueError(
"First- and Lastseed are set, but no seed-variable '%SEEDRAN' found in mask."
)
mask_text = mask_text.replace("%SEEDRAN", "#!#SEEDRAN") # otherwise next line will complain
mask_filled = mask_text % kwargs
mask_filled = mask_filled.replace(
"#!#SEEDRAN", "%SEEDRAN"
) # bring seedran back for sixdesk seed-loop
mask_outfile = masks_path / f"{jobname}.mask"
mask_outfile.write_text(mask_filled)