Source code for pylhc_submitter.sixdesk_tools.troubleshooting

"""
SixDesk Troubleshooting tools
-----------------------------

Some useful functions to troubleshoot the SixDesk output.
"""
import logging
from pathlib import Path

from pylhc_submitter.autosix import get_jobs_and_values
from pylhc_submitter.constants.autosix import (
    get_stagefile_path,
    get_track_path,
    get_workspace_path,
    SIXTRACK_INPUT_CHECK_FILES,
    SIXTRACK_OUTPUT_FILES,
    get_database_path,
)
from pylhc_submitter.sixdesk_tools.stages import STAGE_ORDER

LOG = logging.getLogger(__name__)


# Stages -----------------------------------------------------------------------


[docs] def get_last_stage(jobname, basedir): """Get the last run stage of job `jobname`.""" stage_file = get_stagefile_path(jobname, basedir) last_stage = stage_file.read_text().strip("\n").split("\n")[-1] return STAGE_ORDER[last_stage]
# Set Stages ---
[docs] def set_stages_for_setup(basedir: Path, stage_name: str, jobid_mask: str, replace_dict: dict): """ Sets the last run stage for all jobs from given job-setups. """ jobs, _ = get_jobs_and_values(jobid_mask, **replace_dict) for job in jobs: LOG.info(f"Setting stage to {stage_name} in {job}") set_stages(job, basedir, stage_name)
[docs] def set_stages(jobname: str, basedir: Path, stage_name: str): """Sets the last run stage of all given jobs to `stage_name`.""" if stage_name not in STAGE_ORDER.keys(): raise ValueError(f"Unknown stage '{stage_name}'") new_stage = STAGE_ORDER[stage_name] stages = [] for stage in STAGE_ORDER.keys(): stages.append(stage.name) if stage == new_stage: break stage_file = get_stagefile_path(jobname, basedir) stage_file.write_text("\n".join(stages)) # overwrites old file
[docs] def skip_stages(jobname: str, basedir: Path, stage_name: str): """Skip stages until `stagename`, i.e. similar to `set_stages` but only if the stage hasn't been reached yet. Inverse to `reset_stages`""" if stage_name not in STAGE_ORDER.keys(): raise ValueError(f"Unknown stage '{stage_name}'") new_stage = STAGE_ORDER[stage_name] last_stage = get_last_stage(jobname, basedir) if last_stage < new_stage: LOG.info( f"Skipping stage form {last_stage.name} to {new_stage.name} in {jobname}" ) set_stages(jobname, basedir, stage_name) else: LOG.debug(f"Stage {last_stage.name} unchanged in {jobname}")
[docs] def reset_stages(jobname: str, basedir: Path, stage_name: str): """Reset stages until `stagename`, i.e. similar to `set_stages` but only if the stage has already been run. Inverse to `skip_stages`""" if stage_name not in STAGE_ORDER.keys(): raise ValueError(f"Unknown stage '{stage_name}'") new_stage = STAGE_ORDER[stage_name] last_stage = get_last_stage(jobname, basedir) if last_stage > new_stage: LOG.info(f"Resetting stage from {last_stage.name} to {new_stage.name} in {jobname}") set_stages(jobname, basedir, stage_name) else: LOG.debug(f"Stage {last_stage.name} unchanged in {jobname}")
# Check Stages ---
[docs] def check_stages_for_setup(basedir: Path, stage_name: str, jobid_mask: str, replace_dict: dict): """ Check the last run stage for all jobs from given job-setups. """ jobs, _ = get_jobs_and_values(jobid_mask, **replace_dict) for job in jobs: check_last_stage(job, basedir)
[docs] def check_last_stage(jobname: str, basedir: Path): """Logs names of all last run stages for given jobs.""" last_stage = get_last_stage(jobname, basedir) LOG.info(f"'{jobname}' at stage '{last_stage.name}'")
# Complete check for failure ---------------------------------------------------
[docs] def find_obviously_failed_sixtrack_submissions(basedir: Path): """Checks in jobs in `track` whether the directory structure seems to be created and if there is output data. This checks only the first directories found, to speed up this process. For a more precise scan see check_sixtrack_output_data. """ jobs = [] for job in get_all_jobs_in_base(basedir): try: LOG.debug(str(job)) track = get_track_path(jobname=job, basedir=basedir) first_seed = get_first_dir(track) / "simul" tunes = get_first_dir(first_seed) amp = get_first_dir(tunes, "*_*") turns = get_first_dir(amp, "e*") angle = get_first_dir(turns, ".*") file_names = [f.name for f in angle.glob("*")] out_files_present = [f for f in SIXTRACK_OUTPUT_FILES if f in file_names] if not len(out_files_present): raise OSError(str(get_workspace_path(jobname=job, basedir=basedir))) except OSError as e: LOG.error( f"{e.args[0]} (stage: {get_last_stage(jobname=job, basedir=basedir).name})" ) jobs.append(str(job)) return jobs
[docs] def check_sixtrack_output_data(jobname: str, basedir: Path): """Presence checks for SixDesk tracking output data. This checks recursively all directories in `track`. Will be busy for a while. """ track_path = get_track_path(jobname, basedir) seed_dirs = list(track_path.glob("[0-9]")) + list(track_path.glob("[0-9][0-9]")) if not len(seed_dirs): raise OSError(f"No seed-dirs present in {str(track_path)}.") for seed_dir in seed_dirs: if not seed_dir.is_dir(): continue simul_path = seed_dir / "simul" tunes_dirs = list(simul_path.glob("*")) if not len(tunes_dirs): raise OSError(f"No tunes-dirs present in {str(seed_dir)}.") for tunes_dir in tunes_dirs: if not tunes_dir.is_dir(): continue amp_dirs = list(tunes_dir.glob("*_*")) if not len(amp_dirs): raise OSError(f"No amplitude-dirs present in {str(tunes_dir)}.") for amp_dir in amp_dirs: if not amp_dir.is_dir(): continue turns_dirs = list(amp_dir.glob("*")) if not len(turns_dirs): raise OSError(f"No turns-dirs present in {str(amp_dir)}.") for turn_dir in turns_dirs: if not turn_dir.is_dir(): continue angle_dirs = list(turn_dir.glob(".*")) if not len(angle_dirs): raise OSError(f"No angle-dirs present in {str(turn_dir)}.") for angle_dir in angle_dirs: if not angle_dir.is_dir(): continue htcondor_files = list(angle_dir.glob("htcondor.*")) if len(htcondor_files) != 3: raise OSError(f"Not all htcondor files present in {str(angle_dir)}.") file_names = [f.name for f in angle_dir.glob("*")] in_files_present = [f for f in SIXTRACK_INPUT_CHECK_FILES if f in file_names] if len(in_files_present): raise OSError( f"The files '{in_files_present}' are found in {str(angle_dir)}," "yet they should have been deleted after tracking." ) out_files_present = [f for f in SIXTRACK_OUTPUT_FILES if f in file_names] if not len(out_files_present): raise OSError( f"None of the expected output files '{SIXTRACK_OUTPUT_FILES}' " f"are present in {str(angle_dir)}" )
# Long Database Names Hack ----------------------------------------------------- def create_database_symlink(jobname: str, basedir: Path): db_path = get_database_path(jobname, basedir) if db_path.exists(): LOG.debug(f"Database already exists in {jobname}.") return real_db_path = db_path.parent / "my.db" real_db_path.touch() db_path.symlink_to(real_db_path) LOG.info(f"Crated database link in {jobname}.") def move_database_symlink(jobname: str, basedir: Path): db_path = get_database_path(jobname, basedir) real_db_path = db_path.parent / "my.db" if real_db_path.exists(): real_db_path.rename(db_path) LOG.info(f"Renamed database to its proper name in {jobname}.") # Helper -----------------------------------------------------------------------
[docs] def for_all_jobs(func: callable, basedir: Path, *args, **kwargs): """Do function for all jobs in basedir.""" for job in get_all_jobs_in_base(basedir): func(job, basedir, *args, **kwargs)
[docs] def get_all_jobs_in_base(basedir): """Returns all job-names in the sixdeskbase dir.""" return [f.name.replace("workspace-", "") for f in basedir.glob("workspace-*")]
[docs] def get_first_dir(cwd: Path, glob: str = "*"): """Return first directory of pattern `glob`.""" for d in cwd.glob(glob): if d.is_dir(): return d raise OSError(str(cwd))