Coverage for lst_auto_rta/utils/slurm.py: 72%
64 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-03 14:47 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-03 14:47 +0000
1import shlex
2from csv import DictReader
3from io import StringIO
4from typing import Callable, Dict, List
6from lst_auto_rta.utils.logging import LOGGING_LEVELS_DICT
7from lst_auto_rta.utils.string import (
8 split_string_around_multiple_template_vars,
9 substrings_in_string,
10)
11from lst_auto_rta.utils.subprocess import subprocess_run_and_raise_exception_on_error
14def parse_reservation_state(scontrol_output: str) -> str:
15 """Parse the state of a reservation from `scontrol show res a_reservation` output
17 Parameters
18 ----------
19 scontrol_output : str
20 Output of `scontrol show res a_reservation
22 Returns
23 -------
24 str
25 State of the reservation, eg INACTIVE, ACTIVE
26 """
27 return scontrol_output.split("State=", 1)[1].split(" ", 1)[0]
30def parse_nodes_in_scontrol_show_res_output(scontrol_output: str) -> List[str]:
31 """Parse a string containing `scontrol show res` output to find nodes names.
33 Search for the substring "Nodes" then parses the nodes ID string, and then the nodes numerical ID,
34 which can be a number, list of number, a list of ranges, or a list of the combination of all of that.
35 Examples of strings: ...Nodes=cp[12,34-47] ...
36 ...Nodes=tcs12 ...
37 ...Nodes=lsfj[32-36]
39 Raises
40 ------
41 ValueError
42 If the node's names could not be parsed from the string
44 Returns
45 -------
46 List[str]
47 List of node names in the reservations
48 """
49 nodes = []
50 for reservation_output in scontrol_output.split("Nodes=")[1:]:
51 nodes_str = reservation_output.split(" ")[0]
52 if not nodes_str: 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true
53 raise ValueError("Could not parse nodes of {}".format(scontrol_output))
54 if "[" not in nodes_str: # only 1 node in reservation, append it
55 nodes.append(nodes_str)
56 else:
57 node_prefix, nodes_ID_str = nodes_str.split("[", 1) # split "cp[12, 13]" into "cp", "12, 13]"
58 nodes_ID_str = nodes_ID_str.replace("]", "") # remove "]"
59 for sub_nodes_str in nodes_ID_str.split(","):
60 # can either have node ID value, or a range of IDs, eg "12" or "12-16"
61 if "-" not in sub_nodes_str:
62 nodes.append(node_prefix + sub_nodes_str.strip())
63 else:
64 id_range_boundaries = sub_nodes_str.split("-")
65 if len(id_range_boundaries) != 2: 65 ↛ 66line 65 didn't jump to line 66 because the condition on line 65 was never true
66 raise ValueError("Could not parse nodes of {}".format(scontrol_output))
67 start, stop = int(id_range_boundaries[0]), int(id_range_boundaries[1])
68 nodes.extend([node_prefix + str(node_id) for node_id in range(start, stop + 1)])
69 return nodes
72def parse_slurm_nodes(slurm_reservations: List[str], slurm_account: str) -> Dict[str, List[str]]:
73 """Run `scontrol show res -u `slurm_account` and to get the output.
75 Parameters
76 ----------
77 slurm_reservations : List[str]
78 List of slurm reservation that RTA will use (all nodes in the reservation used)
79 slurm_account : str
80 slurm account of the RTA, to use the slurm reservations
82 Notes
83 -----
84 The nodes belonging to reservation which state is not "ACTIVE" are discarded !
86 Returns
87 -------
88 Dict[str, List[str]]
89 List of slurm nodes available for RTA, per reservation.
90 """
91 nodes = {}
92 for reservation in slurm_reservations:
93 cmd = " ".join(["scontrol", "show", "res", "-u", slurm_account, reservation])
94 scontrol_output = subprocess_run_and_raise_exception_on_error(
95 shlex.split(cmd),
96 "Success of {}".format(cmd),
97 "Failure of {}".format(cmd),
98 LOGGING_LEVELS_DICT["ERROR"],
99 LOGGING_LEVELS_DICT["DEBUG"],
100 ).stdout
101 if parse_reservation_state(scontrol_output) == "ACTIVE":
102 nodes[reservation] = parse_nodes_in_scontrol_show_res_output(scontrol_output)
103 return nodes
106def parse_JOBID_in_squeue_CSV_output(
107 squeue_output: str,
108 filter_dict: Dict[str, Callable[[str], bool]] | None,
109) -> List[int]:
110 """Parses the output of a squeue command, formatted as CSV to retrieve the JOB ID of listed jobs.
112 Warnings
113 --------
114 The output is expected to be CSV (comma separated values), which is obtained with `squeue` by
115 specifying the format with a `,` separator, for instance: "squeue --format="%i,%j" will give a
116 CSV output with 2 columns job ID and job name.
118 Parameters
119 ----------
120 squeue_output : str
121 output of squeue --format="%i,%j" (querying for job ID and job name)
122 filter_dict : Dict[str, Callable[[str], bool]] or None
123 Filtering function dictionary. Keys are the fields in the output of squeue, eg "JOBID"
124 Values are functions to evaluate on a value of this field, returning true or false.
125 If the output of the function is not True, the entry in squeue output is ignored.
126 Optional, default is None in which case not filtering is applied.
128 Returns
129 -------
130 List[int]
131 List of job ID.
132 """
133 # If filter_dict is None, make a function that will pass everything
134 if filter_dict is None:
135 filter_dict = {"JOBID": lambda x: True}
137 # now use csv reader to get a dict for each row. Dict keys are the 1st row values (headers)
138 squeue_output_f = StringIO(squeue_output)
139 reader = DictReader(squeue_output_f, delimiter=",")
140 # Now read the JOBID, but only if the row passes all filtering function
141 job_ids = [int(row["JOBID"]) for row in reader if all([fct(row.get(k, None)) for k, fct in filter_dict.items()])]
142 return job_ids
145def parse_slurm_job_ID(slurm_reservations: List[str], slurm_account: str, r0_dl1_job_name: str) -> List[int]:
146 """Run `squeue` to parse job IDs.
148 Parameters
149 ----------
150 slurm_reservation : List[str]
151 List of slurm reservation where to query jobs
152 slurm_account : str
153 Slurm account of the jobs to find
154 r0_dl1_job_name : str
155 Name of the r0_dl1 jobs in the configuration
157 Returns
158 -------
159 List[int]
160 List of job IDs.
161 """
162 # split r0_dl1_job_name into substrings, excluding template variables
163 # Note: When several subarrays will be used, we need some intelligence to get only r0_dl1 jobs names of
164 # the concerned subarray
165 r0_dl1_job_name_substrings = split_string_around_multiple_template_vars(r0_dl1_job_name, ["@", "§", "$"])
166 job_IDs = []
167 for reservation in slurm_reservations:
168 cmd = " ".join(["squeue", '--format="%i,%j"', "-u", slurm_account, "--reservation={}".format(reservation)])
169 process = subprocess_run_and_raise_exception_on_error(
170 shlex.split(cmd),
171 "Success of {}".format(cmd),
172 "Failure of {}".format(cmd),
173 LOGGING_LEVELS_DICT["ERROR"],
174 LOGGING_LEVELS_DICT["DEBUG"],
175 )
176 job_IDs.extend(
177 parse_JOBID_in_squeue_CSV_output(
178 process.stdout,
179 {
180 "NAME": lambda name: substrings_in_string(
181 name,
182 r0_dl1_job_name_substrings,
183 )
184 },
185 )
186 )
187 return job_IDs
190def job_statistics_from_squeue_output(squeue_output: str):
191 """Parse job statistics from `squeue -u lstrta --format="%T,%R"`
193 Warnings
194 --------
195 The output of squeue is parsed using a CSV reader, so the format specification must be respected.
197 Parameters
198 ----------
199 squeue_output : str
200 Output of squeue
201 """
203 # now use csv reader to get a dict for each row. Dict keys are the 1st row values (headers)
204 squeue_output_f = StringIO(squeue_output)
205 reader = DictReader(squeue_output_f, delimiter=",")
206 n_jobs = 0
207 n_running = 0
208 n_pending = 0
209 n_request_node_not_available = 0
210 for row in reader:
211 n_jobs += 1
212 n_running += row["STATE"] == "RUNNING"
213 n_pending += row["STATE"] == "PENDING"
214 n_request_node_not_available += "ReqNodeNotAvail" in row["NODELIST(REASON)"]
216 return n_jobs, n_running, n_pending, n_request_node_not_available