import shlex
from csv import DictReader
from io import StringIO
from typing import Callable, Dict, List
from lst_auto_rta.utils.logging import LOGGING_LEVELS_DICT
from lst_auto_rta.utils.string import (
split_string_around_multiple_template_vars,
substrings_in_string,
)
from lst_auto_rta.utils.subprocess import subprocess_run_and_raise_exception_on_error
[docs]
def parse_reservation_state(scontrol_output: str) -> str:
"""Parse the state of a reservation from `scontrol show res a_reservation` output
Parameters
----------
scontrol_output : str
Output of `scontrol show res a_reservation
Returns
-------
str
State of the reservation, eg INACTIVE, ACTIVE
"""
return scontrol_output.split("State=", 1)[1].split(" ", 1)[0]
[docs]
def parse_nodes_in_scontrol_show_res_output(scontrol_output: str) -> List[str]:
"""Parse a string containing `scontrol show res` output to find nodes names.
Search for the substring "Nodes" then parses the nodes ID string, and then the nodes numerical ID,
which can be a number, list of number, a list of ranges, or a list of the combination of all of that.
Examples of strings: ...Nodes=cp[12,34-47] ...
...Nodes=tcs12 ...
...Nodes=lsfj[32-36]
Raises
------
ValueError
If the node's names could not be parsed from the string
Returns
-------
List[str]
List of node names in the reservations
"""
nodes = []
for reservation_output in scontrol_output.split("Nodes=")[1:]:
nodes_str = reservation_output.split(" ")[0]
if not nodes_str:
raise ValueError("Could not parse nodes of {}".format(scontrol_output))
if "[" not in nodes_str: # only 1 node in reservation, append it
nodes.append(nodes_str)
else:
node_prefix, nodes_ID_str = nodes_str.split("[", 1) # split "cp[12, 13]" into "cp", "12, 13]"
nodes_ID_str = nodes_ID_str.replace("]", "") # remove "]"
for sub_nodes_str in nodes_ID_str.split(","):
# can either have node ID value, or a range of IDs, eg "12" or "12-16"
if "-" not in sub_nodes_str:
nodes.append(node_prefix + sub_nodes_str.strip())
else:
id_range_boundaries = sub_nodes_str.split("-")
if len(id_range_boundaries) != 2:
raise ValueError("Could not parse nodes of {}".format(scontrol_output))
start, stop = int(id_range_boundaries[0]), int(id_range_boundaries[1])
nodes.extend([node_prefix + str(node_id) for node_id in range(start, stop + 1)])
return nodes
[docs]
def parse_slurm_nodes(slurm_reservations: List[str], slurm_account: str) -> Dict[str, List[str]]:
"""Run `scontrol show res -u `slurm_account` and to get the output.
Parameters
----------
slurm_reservations : List[str]
List of slurm reservation that RTA will use (all nodes in the reservation used)
slurm_account : str
slurm account of the RTA, to use the slurm reservations
Notes
-----
The nodes belonging to reservation which state is not "ACTIVE" are discarded !
Returns
-------
Dict[str, List[str]]
List of slurm nodes available for RTA, per reservation.
"""
nodes = {}
for reservation in slurm_reservations:
cmd = " ".join(["scontrol", "show", "res", "-u", slurm_account, reservation])
scontrol_output = subprocess_run_and_raise_exception_on_error(
shlex.split(cmd),
"Success of {}".format(cmd),
"Failure of {}".format(cmd),
LOGGING_LEVELS_DICT["ERROR"],
LOGGING_LEVELS_DICT["DEBUG"],
).stdout
if parse_reservation_state(scontrol_output) == "ACTIVE":
nodes[reservation] = parse_nodes_in_scontrol_show_res_output(scontrol_output)
return nodes
[docs]
def parse_JOBID_in_squeue_CSV_output(
squeue_output: str,
filter_dict: Dict[str, Callable[[str], bool]] | None,
) -> List[int]:
"""Parses the output of a squeue command, formatted as CSV to retrieve the JOB ID of listed jobs.
Warnings
--------
The output is expected to be CSV (comma separated values), which is obtained with `squeue` by
specifying the format with a `,` separator, for instance: "squeue --format="%i,%j" will give a
CSV output with 2 columns job ID and job name.
Parameters
----------
squeue_output : str
output of squeue --format="%i,%j" (querying for job ID and job name)
filter_dict : Dict[str, Callable[[str], bool]] or None
Filtering function dictionary. Keys are the fields in the output of squeue, eg "JOBID"
Values are functions to evaluate on a value of this field, returning true or false.
If the output of the function is not True, the entry in squeue output is ignored.
Optional, default is None in which case not filtering is applied.
Returns
-------
List[int]
List of job ID.
"""
# If filter_dict is None, make a function that will pass everything
if filter_dict is None:
filter_dict = {"JOBID": lambda x: True}
# now use csv reader to get a dict for each row. Dict keys are the 1st row values (headers)
squeue_output_f = StringIO(squeue_output)
reader = DictReader(squeue_output_f, delimiter=",")
# Now read the JOBID, but only if the row passes all filtering function
job_ids = [int(row["JOBID"]) for row in reader if all([fct(row.get(k, None)) for k, fct in filter_dict.items()])]
return job_ids
[docs]
def parse_slurm_job_ID(slurm_reservations: List[str], slurm_account: str, r0_dl1_job_name: str) -> List[int]:
"""Run `squeue` to parse job IDs.
Parameters
----------
slurm_reservation : List[str]
List of slurm reservation where to query jobs
slurm_account : str
Slurm account of the jobs to find
r0_dl1_job_name : str
Name of the r0_dl1 jobs in the configuration
Returns
-------
List[int]
List of job IDs.
"""
# split r0_dl1_job_name into substrings, excluding template variables
# Note: When several subarrays will be used, we need some intelligence to get only r0_dl1 jobs names of
# the concerned subarray
r0_dl1_job_name_substrings = split_string_around_multiple_template_vars(r0_dl1_job_name, ["@", "ยง", "$"])
job_IDs = []
for reservation in slurm_reservations:
cmd = " ".join(["squeue", '--format="%i,%j"', "-u", slurm_account, "--reservation={}".format(reservation)])
process = subprocess_run_and_raise_exception_on_error(
shlex.split(cmd),
"Success of {}".format(cmd),
"Failure of {}".format(cmd),
LOGGING_LEVELS_DICT["ERROR"],
LOGGING_LEVELS_DICT["DEBUG"],
)
job_IDs.extend(
parse_JOBID_in_squeue_CSV_output(
process.stdout,
{
"NAME": lambda name: substrings_in_string(
name,
r0_dl1_job_name_substrings,
)
},
)
)
return job_IDs
[docs]
def job_statistics_from_squeue_output(squeue_output: str):
"""Parse job statistics from `squeue -u lstrta --format="%T,%R"`
Warnings
--------
The output of squeue is parsed using a CSV reader, so the format specification must be respected.
Parameters
----------
squeue_output : str
Output of squeue
"""
# now use csv reader to get a dict for each row. Dict keys are the 1st row values (headers)
squeue_output_f = StringIO(squeue_output)
reader = DictReader(squeue_output_f, delimiter=",")
n_jobs = 0
n_running = 0
n_pending = 0
n_request_node_not_available = 0
for row in reader:
n_jobs += 1
n_running += row["STATE"] == "RUNNING"
n_pending += row["STATE"] == "PENDING"
n_request_node_not_available += "ReqNodeNotAvail" in row["NODELIST(REASON)"]
return n_jobs, n_running, n_pending, n_request_node_not_available