Source code for pykanto.utils.slurm.launch

# ─── DESCRIPTION ──────────────────────────────────────────────────────────────

"""Submit a slurm job that employs distributed/parallel computation using
ray."""

# ─── DEPENDENCIES ─────────────────────────────────────────────────────────────

import argparse
import os
import subprocess
import sys
import time
from pathlib import Path

# ──── SETTINGS ────────────────────────────────────────────────────────────────

JOB_NAME = "{{JOB_NAME}}"
NUM_NODES = "{{NUM_NODES}}"
GPU = "{{GPU}}"
PARTITION_NAME = "{{PARTITION_NAME}}"
COMMAND_PLACEHOLDER = "{{COMMAND_PLACEHOLDER}}"
GIVEN_NODE = "{{GIVEN_NODE}}"
COMMAND_SUFFIX = "{{COMMAND_SUFFIX}}"
LOAD_ENV = "{{LOAD_ENV}}"
TIME = "{{TIME}}"
MEMORY = "{{MEMORY}}"
OUT_DIR = "{{OUT_DIR}}"

# ──── FUNCTIONS ────────────────────────────────────────────────────────────────


[docs]def submit_job(): """ Parses arguments and submits a ray job to slurm. Code from Peng Zhenghao; modifications (c) 2021 Nilo M. Recalde. See `source code by Peng Zhenghao <https://github.com/pengzhenghao/use-ray-with-slurm>`_. Also see `ray instructions <https://docs.ray.io/en/ray-1.1.0/cluster/slurm.html>`_. Run `pykanto-slaunch --help` for arguments. Output bash and log files are saved in a `/logs` directory within the directory from which you called the script. You can easily change this behaviour by editing the `out_dir` below Note: This works as of 2022 @ Oxford University ARC HPC. Chances are it will not work for you 'out of the box'; this submodule is intended more as a guide or reference than a foolproof way of submitting multi-node / GPU ray jobs. """ # TODO: #16 @nilomr: migrate CLI to typer # Locate bash template file template_file = Path(__file__).parent / "sbatch_template.sh" # Path to output folder out_dir = Path(os.getcwd()) / "logs" out_dir.mkdir(parents=True, exist_ok=True) # Define and parse arguments parser = argparse.ArgumentParser() parser.add_argument( "--exp-name", "-exp", type=str, required=True, help="The job name and path to logging file (exp_name.log).", ) parser.add_argument( "--num-nodes", "-n", type=int, default=1, help="Number of nodes to use." ) parser.add_argument( "--node", "-w", type=str, default="", help="The specified nodes to use. Same format as the return of 'sinfo'. Default: ''.", ) parser.add_argument( "--gpu", type=str, default=0, help="Number and constraints on GPUs to use. (Default: 0)", ) parser.add_argument( "--time", type=str, default="00:10:00", help="Time allocated to job. (Default: '00:10:00')", ) parser.add_argument( "--memory", type=int, default=20000, help="Memory allocated to job. (Default: 20000)", ) parser.add_argument( "--partition", "-p", type=str, default="short", ) parser.add_argument( "--load-env", "-env", type=str, default="", required=True, help=( "The name of your environment. Note: you have to provide " "the location of your envs in the `sbatch_template.sh` file" ), ) parser.add_argument( "--command", "-c", type=str, required=True, help="The command you wish to execute. For example: --command 'python " "test.py' Note that the command must be a string.", ) args = parser.parse_args() if args.node: # assert args.num_nodes == 1 node_info = "#SBATCH -w {}".format(args.node) else: node_info = "" job_name = "{}_{}".format( args.exp_name, time.strftime("%m%d-%H%M", time.localtime()) ) # ===== Modified the template script ===== with open(template_file, "r") as f: text = f.read() text = text.replace(JOB_NAME, job_name) text = text.replace(NUM_NODES, str(args.num_nodes)) text = text.replace(GPU, str(args.gpu)) text = text.replace(TIME, str(args.time)) text = text.replace(PARTITION_NAME, str(args.partition)) text = text.replace(COMMAND_PLACEHOLDER, str(args.command)) text = text.replace(LOAD_ENV, str(args.load_env)) text = text.replace(GIVEN_NODE, node_info) text = text.replace(MEMORY, str(args.memory)) text = text.replace(COMMAND_SUFFIX, "") text = text.replace(OUT_DIR, str(out_dir)) # ===== Save the script ===== script_file = str(out_dir / f"{job_name}.sh") with open(script_file, "w") as f: f.write(text) # ===== Submit the job ===== print("Submitting job.") subprocess.Popen(["sbatch", script_file]) print( f"Job submitted! Script file is at: {script_file}. " f"Log file is at: {str(out_dir / job_name)}.log" ) sys.exit(0)
if __name__ == "__main__": submit_job()