Source code for pylhc_submitter.job_submitter

"""
Job Submitter
-------------


The ``job_submitter`` allows to execute a parametric study using a script mask and a `dictionary` of parameters to replace in this mask, from the command line.
These parameters must be present in the given mask in the ``%(PARAMETER)s`` format (other types apart from string are also allowed).

The type of script and executable is freely choosable, but defaults to ``madx``, for which this submitter was originally written.
When submitting to ``HTCondor``, data to be transferred back to the working directory must be written in a sub-folder defined by ``job_output_directory`` which defaults to **Outputdata**.

This script also allows to check if all ``HTCondor`` jobs finished successfully, for resubmissions with a different parameter grid, and for local execution.
A **Jobs.tfs** file is created in the working directory containing the Job Id, parameter per job
and job directory for further post processing.

For additional information and guides, see the `Job Submitter page
<https://pylhc.github.io/packages/pylhcsubmitter/job_submitter.html>`_ in the ``OMC`` documentation site.


*--Required--*

- **mask** *(PathOrStr)*:

    Program mask to use


- **replace_dict** *(DictAsString)*:

    Dict containing the str to replace as keys and values a list of
    parameters to replace


- **working_directory** *(PathOrStr)*:

    Directory where data should be put


*--Optional--*

- **append_jobs**:

    Flag to rerun job with finer/wider grid, already existing points will
    not be reexecuted.

    action: ``store_true``


- **check_files** *(str)*:

    List of files/file-name-masks expected to be in the 'job_output_dir'
    after a successful job (for appending/resuming). Uses the 'glob'
    function, so unix-wildcards (*) are allowed. If not given, only the
    presence of the folder itself is checked.


- **dryrun**:

    Flag to only prepare folders and scripts, but does not start/submit
    jobs. Together with `resume_jobs` this can be use to check which jobs
    succeeded and which failed.

    action: ``store_true``


- **executable** *(PathOrStr)*:

    Path to executable or job-type (of ['madx', 'python3', 'python2']) to
    use.

    default: ``madx``


- **htc_arguments** *(DictAsString)*:

    Additional arguments for htcondor, as Dict-String. For AccountingGroup
    please use 'accounting_group'. 'max_retries' and 'notification' have
    defaults (if not given). Others are just passed on.

    default: ``{}``


- **job_output_dir** *(str)*:

    The name of the output dir of the job. (Make sure your script puts its
    data there!)

    default: ``Outputdata``


- **jobflavour** *(str)*:

    Jobflavour to give rough estimate of runtime of one job

    choices: ``('espresso', 'microcentury', 'longlunch', 'workday', 'tomorrow', 'testmatch', 'nextweek')``

    default: ``workday``


- **jobid_mask** *(str)*:

    Mask to name jobs from replace_dict


- **num_processes** *(int)*:

    Number of processes to be used if run locally

    default: ``4``


- **output_destination** *(PathOrStr)*:

    Directory to copy the output of the jobs to, sorted into folders per job.
    Can be on EOS, preferrably via EOS-URI format ('root://eosuser.cern.ch//eos/...').


- **resume_jobs**:

    Only do jobs that did not work.

    action: ``store_true``


- **run_local**:

    Flag to run the jobs on the local machine. Not suggested.

    action: ``store_true``


- **script_arguments** *(DictAsString)*:

    Additional arguments to pass to the script, as dict in key-value pairs
    ('--' need to be included in the keys).

    default: ``{}``


- **script_extension** *(str)*:

    New extension for the scripts created from the masks. This is inferred
    automatically for ['madx', 'python3', 'python2']. Otherwise not
    changed.


- **ssh** *(str)*:

    Run htcondor from this machine via ssh (needs access to the
    `working_directory`)


"""
import logging
import sys
from dataclasses import fields
from pathlib import Path

from generic_parser import EntryPointParameters, entrypoint
from generic_parser.entry_datatypes import DictAsString
from generic_parser.tools import print_dict_tree

from pylhc_submitter.constants.htcondor import JOBFLAVOURS
from pylhc_submitter.constants.job_submitter import EXECUTEABLEPATH, SCRIPT_EXTENSIONS
from pylhc_submitter.submitter.iotools import CreationOpts, create_jobs, is_eos_uri, print_stats
from pylhc_submitter.submitter.mask import (check_percentage_signs_in_mask,
                                            find_named_variables_in_mask, is_mask_file)
from pylhc_submitter.submitter.runners import RunnerOpts, run_jobs
from pylhc_submitter.utils.iotools import (PathOrStr, keys_to_path, make_replace_entries_iterable,
                                           save_config)
from pylhc_submitter.utils.logging_tools import log_setup

LOG = logging.getLogger(__name__)


try:
    import htcondor
except ImportError:
    platform = "macOS" if sys.platform == "darwin" else "windows"
    LOG.warning(
        f"htcondor python bindings are linux-only. You can still use job_submitter on {platform}, "
        "but only for local runs."
    )
    htcondor = None


def get_params():
    params = EntryPointParameters()
    params.add_parameter(
        name="mask",
        type=PathOrStr,
        required=True,
        help="Program mask to use",
    )
    params.add_parameter(
        name="working_directory",
        type=PathOrStr,
        required=True,
        help="Directory where data should be put",
    )
    params.add_parameter(
        name="executable",
        default="madx",
        type=PathOrStr,
        help=(
            "Path to executable or job-type " f"(of {str(list(EXECUTEABLEPATH.keys()))}) to use."
        ),
    )
    params.add_parameter(
        name="jobflavour",
        type=str,
        choices=JOBFLAVOURS,
        default="workday",
        help="Jobflavour to give rough estimate of runtime of one job ",
    )
    params.add_parameter(
        name="run_local",
        action="store_true",
        help="Flag to run the jobs on the local machine. Not suggested.",
    )
    params.add_parameter(
        name="resume_jobs",
        action="store_true",
        help="Only do jobs that did not work.",
    )
    params.add_parameter(
        name="append_jobs",
        action="store_true",
        help=(
            "Flag to rerun job with finer/wider grid, already existing points will not be "
            "reexecuted."
        ),
    )
    params.add_parameter(
        name="dryrun",
        action="store_true",
        help=(
            "Flag to only prepare folders and scripts, but does not start/submit jobs. "
            "Together with `resume_jobs` this can be use to check which jobs "
            "succeeded and which failed."
        ),
    )
    params.add_parameter(
        name="replace_dict",
        help=(
            "Dict containing the str to replace as keys and values a list of parameters to "
            "replace"
        ),
        type=DictAsString,
        required=True,
    )
    params.add_parameter(
        name="script_arguments",
        help=(
            "Additional arguments to pass to the script, as dict in key-value pairs "
            "('--' need to be included in the keys)."
        ),
        type=DictAsString,
        default={},
    )
    params.add_parameter(
        name="script_extension",
        help=(
            "New extension for the scripts created from the masks. This is inferred "
            f"automatically for {str(list(SCRIPT_EXTENSIONS.keys()))}. Otherwise not changed."
        ),
        type=str,
    )
    params.add_parameter(
        name="num_processes",
        help="Number of processes to be used if run locally",
        type=int,
        default=4,
    )
    params.add_parameter(
        name="check_files",
        help=(
            "List of files/file-name-masks expected to be in the "
            "'job_output_dir' after a successful job "
            "(for appending/resuming). Uses the 'glob' function, so "
            "unix-wildcards (*) are allowed. If not given, only the "
            "presence of the folder itself is checked."
        ),
        type=str,
        nargs="+",
    )
    params.add_parameter(
        name="jobid_mask",
        help="Mask to name jobs from replace_dict",
        type=str,
    )
    params.add_parameter(
        name="job_output_dir",
        help="The name of the output dir of the job. (Make sure your script puts its data there!)",
        type=str,
        default="Outputdata",
    )
    params.add_parameter(
        name="output_destination",
        help="Directory to copy the output of the jobs to, sorted into folders per job. "
             "Can be on EOS, preferrably via EOS-URI format ('root://eosuser.cern.ch//eos/...').",
        type=PathOrStr,
    )
    params.add_parameter(
        name="htc_arguments",
        help=(
            "Additional arguments for htcondor, as Dict-String. "
            "For AccountingGroup please use 'accounting_group'. "
            "'max_retries' and 'notification' have defaults (if not given). "
            "Others are just passed on. "
        ),
        type=DictAsString,
        default={},
    )
    params.add_parameter(
        name="ssh",
        help="Run htcondor from this machine via ssh (needs access to the `working_directory`)",
        type=str,
    )

    return params


@entrypoint(get_params(), strict=True)
def main(opt):
    if not opt.run_local:
        LOG.info("Starting HTCondor Job-submitter.")
        _check_htcondor_presence()
    else:
        LOG.info("Starting Job-submitter.")

    save_config(Path(opt.working_directory), opt, "job_submitter")
    creation_opt, runner_opt = check_opts(opt)

    job_df, dropped_jobs = create_jobs(creation_opt)

    run_jobs(job_df, runner_opt)

    print_stats(job_df.index, dropped_jobs)


[docs] def check_opts(opt): """ Checks options and sorts them into job-creation and running parameters. """ LOG.debug("Checking options.") if opt.resume_jobs and opt.append_jobs: raise ValueError("Select either Resume jobs or Append jobs") # Paths --- opt = keys_to_path(opt, "working_directory", "executable") if str(opt.executable) in EXECUTEABLEPATH.keys(): opt.executable = str(opt.executable) if is_mask_file(opt.mask): mask_content = Path(opt.mask).read_text() # checks that mask and dir are there opt.mask = Path(opt.mask) else: mask_content = opt.mask if is_eos_uri(opt.output_destination) and not ("://" in opt.output_destination and "//eos/" in opt.output_destination): raise ValueError( "The 'output_destination' is an EOS-URI but missing '://' or '//eos' (double slashes?). " ) # Replace dict --- dict_keys = set(opt.replace_dict.keys()) mask_keys = find_named_variables_in_mask(mask_content) not_in_mask = dict_keys - mask_keys not_in_dict = mask_keys - dict_keys if len(not_in_dict): raise KeyError( "The following keys in the mask were not found in the given replace_dict: " f"{str(not_in_dict).strip('{}')}" ) if len(not_in_mask): LOG.warning( "The following replace_dict keys were not found in the given mask: " f"{str(not_in_mask).strip('{}')}" ) # remove all keys which are not present in mask (otherwise unnecessary jobs) [opt.replace_dict.pop(key) for key in not_in_mask] if len(opt.replace_dict) == 0: raise KeyError("Empty replace-dictionary") check_percentage_signs_in_mask(mask_content) print_dict_tree(opt, name="Input parameter", print_fun=LOG.debug) opt.replace_dict = make_replace_entries_iterable(opt.replace_dict) # Create new classes opt.output_dir = opt.job_output_dir # renaming creation = CreationOpts(**{f.name: opt[f.name] for f in fields(CreationOpts)}) runner = RunnerOpts(**{f.name: opt[f.name] for f in fields(RunnerOpts)}) runner.output_dir = '""' if opt.output_destination else opt.output_dir # empty string stops htc transfer of files return creation, runner
def _check_htcondor_presence() -> None: """ Raises an error if htcondor is not installed. """ if htcondor is None: raise EnvironmentError("htcondor bindings are necessary to run this module.") # Script Mode ------------------------------------------------------------------ if __name__ == "__main__": log_setup() main()