Source code for utils.htcondor_wrapper

""" Tools to use htcondor via python.
Requires you to be on a computer where htcondor is set up.
For local machines see: https://twiki.cern.ch/twiki/bin/view/ABPComputing/LxbatchHTCondor

Hint: I had to add the path of the newly installed modules to the batch_Krb5_credential script to
be able to find the Authen::Krb5 package:
(use lib "/home/jdilly/perl5/lib/perl5/x86_64-linux-gnu-thread-multi/";)

Also, make sure the SCHEDD_NAME is set properly in the config file mentioned above.
(i.e. go to lxplus, run condor_q and see which scheduler was assigned to you)

If you run jobs that take longer than 25h, make sure you are using a renewable Kerberos ticket:
kinit -r 604800  (i.e. renewable  by "kinit -R" for 1 week)

Python Readme:
https://htcondor-python.readthedocs.io/en/latest

IMPORTANT: This functionality relies on shared-space of all files between htcondor and the user.
"""
import subprocess

import madx_wrapper
import os
import htcondor
import six
from utils import logging_tools
from utils.contexts import suppress_exception

LOG = logging_tools.get_logger(__name__)

SHEBANG = "#!/bin/bash"
BASHFILE_MASK = "htcbash{:s}.sh"
SUBFILE = "queuehtc.sub"
MADX_PATH = madx_wrapper.MADX_AFS_PATH  # always use afs

CMD_SUBMIT = "condor_submit"
CMD_VIEW = "condor_q"
CMD_RM = "condor_rm"


# HTCondor Python Methods ######################################################


[docs]def submit_job(job): """ Submits the job to the scheduler This would be the nicer way, BUT: the python bindings do currently not forward the keberos token. """ raise NotImplementedError("Submitting via python not implemented because of kerberos") schedd = htcondor.Schedd() # works if schedd_name is set up with schedd.transaction() as txn: clusterId = job.queue(txn) # assuming queue args are set in job, or 1 return clusterId
def view_history(): schedd = htcondor.Schedd() for ad in schedd.history('true', ['Owner', 'ProcId', 'ClusterId', 'JobStatus', 'WallDuration'], 2): LOG.info(ad) # Subprocess Methods ###########################################################
[docs]def create_subfile_from_job(folder, job): """ Write file to submit to htcondor """ subfile = os.path.join(folder, SUBFILE) with open(subfile, "w") as f: f.write(str(job)) return subfile
[docs]def submit_jobfile(jobfile): """ Submit subfile to htcondor via subprocess """ LOG.info("Sending {:s} to htcondor.".format(jobfile)) _start_subprocess_with_logger([CMD_SUBMIT, jobfile])
def log_current_jobs_subprocess(): _start_subprocess_with_logger(CMD_VIEW) # Job Creation #################################################################
[docs]def create_multijob_for_bashfiles(folder, n_files, duration="longlunch"): """ Function to create a HTCondor job assuming n_files bash-files. """ dura_key, dura_val = _get_duration(duration) job = htcondor.Submit({ "MyId": "htcondor", "executable": os.path.join(folder, BASHFILE_MASK.format(".$(ProcId)")), "arguments": "$(ClusterId) $(ProcId)", "initialdir": os.path.join(folder), "output": os.path.join("$(initialdir)", "$(MyId).$(ClusterId).$(ProcId).out"), "error": os.path.join("$(initialdir)", "$(MyId).$(ClusterId).$(ProcId).err"), "log": os.path.join("$(initialdir)", "$(MyId).$(ClusterId).$(ProcId).log"), dura_key: dura_val, }) job.setQArgs("{:d}".format(n_files)) return job
[docs]def create_job_for_bashfile(bashfile, duration="longlunch"): """ Returns a simple Submit() object for the bashfile. """ dura_key, dura_val = _get_duration(duration) folder = os.path.dirname(bashfile) job = htcondor.Submit({ "MyId": os.path.splitext(os.path.basename(bashfile))[0], "executable": bashfile, "arguments": "$(ClusterId)", "initialdir": os.path.join(folder), "output": os.path.join("$(initialdir)", "$(MyId).$(ClusterId).out"), "error": os.path.join("$(initialdir)", "$(MyId).$(ClusterId).err"), "log": os.path.join("$(initialdir)", "$(MyId).$(ClusterId).log"), dura_key: dura_val, }) job.setQArgs("{:d}".format(1)) return job
# For MADX #####################################################################
[docs]def write_madx_bash(folder, id, madx_files): """ Write bash file to call madx files Args: folder: Folder to write file into id: id for current bash_file madx_files: List of madx-files to run in this job. Returns: Path to current madx file """ with suppress_exception(ValueError): id = "{:d}".format(id) # in case it's an int if id and not id.startswith("."): id = ".{:s}".format(id) filename = os.path.join(folder, BASHFILE_MASK.format(id)) with open(filename, "w") as f: f.write(SHEBANG + "\n") for madx_file in madx_files: f.write("{:s} < {:s}\n".format(MADX_PATH, madx_file)) return filename
# Helper ####################################################################### def _get_duration(duration): if isinstance(duration, six.string_types): # espresso 20min, microcentury 1h, longlunch 2h, workday 8h, # tomorrow 1d, testmatch 3d, nextweek 1w return "+JobFlavour", '"{:s}"'.format(duration) return "+MaxRuntime", str(duration) # runtime in seconds def _start_subprocess_with_logger(command): process = subprocess.Popen(command, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,) for line in iter(process.stdout.readline, b''): line = line.strip() if line: LOG.info(line) status = process.wait() return status # Script Mode ################################################################## if __name__ == '__main__': raise EnvironmentError("{:s} is not supposed to run as main.".format(__file__))