Coverage for lst_auto_rta/utils/slurm.py: 72%

64 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-22 14:47 +0000

1import shlex 

2from csv import DictReader 

3from io import StringIO 

4from typing import Callable, Dict, List 

5 

6from lst_auto_rta.utils.logging import LOGGING_LEVELS_DICT 

7from lst_auto_rta.utils.string import ( 

8 split_string_around_multiple_template_vars, 

9 substrings_in_string, 

10) 

11from lst_auto_rta.utils.subprocess import subprocess_run_and_raise_exception_on_error 

12 

13 

14def parse_reservation_state(scontrol_output: str) -> str: 

15 """Parse the state of a reservation from `scontrol show res a_reservation` output 

16 

17 Parameters 

18 ---------- 

19 scontrol_output : str 

20 Output of `scontrol show res a_reservation 

21 

22 Returns 

23 ------- 

24 str 

25 State of the reservation, eg INACTIVE, ACTIVE 

26 """ 

27 return scontrol_output.split("State=", 1)[1].split(" ", 1)[0] 

28 

29 

30def parse_nodes_in_scontrol_show_res_output(scontrol_output: str) -> List[str]: 

31 """Parse a string containing `scontrol show res` output to find nodes names. 

32 

33 Search for the substring "Nodes" then parses the nodes ID string, and then the nodes numerical ID, 

34 which can be a number, list of number, a list of ranges, or a list of the combination of all of that. 

35 Examples of strings: ...Nodes=cp[12,34-47] ... 

36 ...Nodes=tcs12 ... 

37 ...Nodes=lsfj[32-36] 

38 

39 Raises 

40 ------ 

41 ValueError 

42 If the node's names could not be parsed from the string 

43 

44 Returns 

45 ------- 

46 List[str] 

47 List of node names in the reservations 

48 """ 

49 nodes = [] 

50 for reservation_output in scontrol_output.split("Nodes=")[1:]: 

51 nodes_str = reservation_output.split(" ")[0] 

52 if not nodes_str: 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true

53 raise ValueError("Could not parse nodes of {}".format(scontrol_output)) 

54 if "[" not in nodes_str: # only 1 node in reservation, append it 

55 nodes.append(nodes_str) 

56 else: 

57 node_prefix, nodes_ID_str = nodes_str.split("[", 1) # split "cp[12, 13]" into "cp", "12, 13]" 

58 nodes_ID_str = nodes_ID_str.replace("]", "") # remove "]" 

59 for sub_nodes_str in nodes_ID_str.split(","): 

60 # can either have node ID value, or a range of IDs, eg "12" or "12-16" 

61 if "-" not in sub_nodes_str: 

62 nodes.append(node_prefix + sub_nodes_str.strip()) 

63 else: 

64 id_range_boundaries = sub_nodes_str.split("-") 

65 if len(id_range_boundaries) != 2: 65 ↛ 66line 65 didn't jump to line 66 because the condition on line 65 was never true

66 raise ValueError("Could not parse nodes of {}".format(scontrol_output)) 

67 start, stop = int(id_range_boundaries[0]), int(id_range_boundaries[1]) 

68 nodes.extend([node_prefix + str(node_id) for node_id in range(start, stop + 1)]) 

69 return nodes 

70 

71 

72def parse_slurm_nodes(slurm_reservations: List[str], slurm_account: str) -> Dict[str, List[str]]: 

73 """Run `scontrol show res -u `slurm_account` and to get the output. 

74 

75 Parameters 

76 ---------- 

77 slurm_reservations : List[str] 

78 List of slurm reservation that RTA will use (all nodes in the reservation used) 

79 slurm_account : str 

80 slurm account of the RTA, to use the slurm reservations 

81 

82 Notes 

83 ----- 

84 The nodes belonging to reservation which state is not "ACTIVE" are discarded ! 

85 

86 Returns 

87 ------- 

88 Dict[str, List[str]] 

89 List of slurm nodes available for RTA, per reservation. 

90 """ 

91 nodes = {} 

92 for reservation in slurm_reservations: 

93 cmd = " ".join(["scontrol", "show", "res", "-u", slurm_account, reservation]) 

94 scontrol_output = subprocess_run_and_raise_exception_on_error( 

95 shlex.split(cmd), 

96 "Success of {}".format(cmd), 

97 "Failure of {}".format(cmd), 

98 LOGGING_LEVELS_DICT["ERROR"], 

99 LOGGING_LEVELS_DICT["DEBUG"], 

100 ).stdout 

101 if parse_reservation_state(scontrol_output) == "ACTIVE": 

102 nodes[reservation] = parse_nodes_in_scontrol_show_res_output(scontrol_output) 

103 return nodes 

104 

105 

106def parse_JOBID_in_squeue_CSV_output( 

107 squeue_output: str, 

108 filter_dict: Dict[str, Callable[[str], bool]] | None, 

109) -> List[int]: 

110 """Parses the output of a squeue command, formatted as CSV to retrieve the JOB ID of listed jobs. 

111 

112 Warnings 

113 -------- 

114 The output is expected to be CSV (comma separated values), which is obtained with `squeue` by 

115 specifying the format with a `,` separator, for instance: "squeue --format="%i,%j" will give a 

116 CSV output with 2 columns job ID and job name. 

117 

118 Parameters 

119 ---------- 

120 squeue_output : str 

121 output of squeue --format="%i,%j" (querying for job ID and job name) 

122 filter_dict : Dict[str, Callable[[str], bool]] or None 

123 Filtering function dictionary. Keys are the fields in the output of squeue, eg "JOBID" 

124 Values are functions to evaluate on a value of this field, returning true or false. 

125 If the output of the function is not True, the entry in squeue output is ignored. 

126 Optional, default is None in which case not filtering is applied. 

127 

128 Returns 

129 ------- 

130 List[int] 

131 List of job ID. 

132 """ 

133 # If filter_dict is None, make a function that will pass everything 

134 if filter_dict is None: 

135 filter_dict = {"JOBID": lambda x: True} 

136 

137 # now use csv reader to get a dict for each row. Dict keys are the 1st row values (headers) 

138 squeue_output_f = StringIO(squeue_output) 

139 reader = DictReader(squeue_output_f, delimiter=",") 

140 # Now read the JOBID, but only if the row passes all filtering function 

141 job_ids = [int(row["JOBID"]) for row in reader if all([fct(row.get(k, None)) for k, fct in filter_dict.items()])] 

142 return job_ids 

143 

144 

145def parse_slurm_job_ID(slurm_reservations: List[str], slurm_account: str, r0_dl1_job_name: str) -> List[int]: 

146 """Run `squeue` to parse job IDs. 

147 

148 Parameters 

149 ---------- 

150 slurm_reservation : List[str] 

151 List of slurm reservation where to query jobs 

152 slurm_account : str 

153 Slurm account of the jobs to find 

154 r0_dl1_job_name : str 

155 Name of the r0_dl1 jobs in the configuration 

156 

157 Returns 

158 ------- 

159 List[int] 

160 List of job IDs. 

161 """ 

162 # split r0_dl1_job_name into substrings, excluding template variables 

163 # Note: When several subarrays will be used, we need some intelligence to get only r0_dl1 jobs names of 

164 # the concerned subarray 

165 r0_dl1_job_name_substrings = split_string_around_multiple_template_vars(r0_dl1_job_name, ["@", "§", "$"]) 

166 job_IDs = [] 

167 for reservation in slurm_reservations: 

168 cmd = " ".join(["squeue", '--format="%i,%j"', "-u", slurm_account, "--reservation={}".format(reservation)]) 

169 process = subprocess_run_and_raise_exception_on_error( 

170 shlex.split(cmd), 

171 "Success of {}".format(cmd), 

172 "Failure of {}".format(cmd), 

173 LOGGING_LEVELS_DICT["ERROR"], 

174 LOGGING_LEVELS_DICT["DEBUG"], 

175 ) 

176 job_IDs.extend( 

177 parse_JOBID_in_squeue_CSV_output( 

178 process.stdout, 

179 { 

180 "NAME": lambda name: substrings_in_string( 

181 name, 

182 r0_dl1_job_name_substrings, 

183 ) 

184 }, 

185 ) 

186 ) 

187 return job_IDs 

188 

189 

190def job_statistics_from_squeue_output(squeue_output: str): 

191 """Parse job statistics from `squeue -u lstrta --format="%T,%R"` 

192 

193 Warnings 

194 -------- 

195 The output of squeue is parsed using a CSV reader, so the format specification must be respected. 

196 

197 Parameters 

198 ---------- 

199 squeue_output : str 

200 Output of squeue 

201 """ 

202 

203 # now use csv reader to get a dict for each row. Dict keys are the 1st row values (headers) 

204 squeue_output_f = StringIO(squeue_output) 

205 reader = DictReader(squeue_output_f, delimiter=",") 

206 n_jobs = 0 

207 n_running = 0 

208 n_pending = 0 

209 n_request_node_not_available = 0 

210 for row in reader: 

211 n_jobs += 1 

212 n_running += row["STATE"] == "RUNNING" 

213 n_pending += row["STATE"] == "PENDING" 

214 n_request_node_not_available += "ReqNodeNotAvail" in row["NODELIST(REASON)"] 

215 

216 return n_jobs, n_running, n_pending, n_request_node_not_available