Coverage for lst_auto_rta/Auto_RTA.py: 24%

174 statements  

« prev     ^ index     » next       coverage.py v7.6.7, created at 2024-11-17 14:47 +0000

1#!/usr/bin/env python 

2 

3"""Automatically start/stop RTA reconstruction pipeline for new runs during a observation night 

4 

5New runs are found querying the TCU pymongo database regularly. 

6This script: 

7- copies the conda environment and reconstruction model to the RAM of the slurm nodes 

8- queries the TCU pymongo database for new runs regularly and for each new run: 

9 - stops the r0->dl1 daemons for the previous runs 

10 - starts the r0->dl1 daemons for the new run, using a static configuration (CDB configuration) 

11 from disk, and writing the dynamic configuration to disk as well. 

12 - starts the "engineering gui" scripts allowing to monitor data processing for the run. 

13- stops at a fixed hour according to its configuration file 

14- cleans the RAM of slurm worker nodes 

15""" 

16 

17import argparse 

18import datetime 

19import json 

20import logging 

21import shlex 

22import signal 

23import subprocess as sp 

24import time 

25from pathlib import Path 

26from subprocess import CalledProcessError 

27from threading import Thread 

28from typing import Dict, List, NamedTuple 

29 

30from annotated_types import Gt, Le 

31from pymongo.errors import PyMongoError 

32from typing_extensions import Annotated 

33 

34from lst_auto_rta.config.configuration import ( 

35 AutoRTAConfiguration, 

36 DataStreamConnectionConfiguration, 

37 ObservationParameters, 

38) 

39from lst_auto_rta.observation_data import ObsInfo, get_current_run_info 

40from lst_auto_rta.paths import RecoPathStructure 

41from lst_auto_rta.utils.logging import LOGGING_LEVELS_DICT, init_logging 

42from lst_auto_rta.utils.slurm import ( 

43 job_statistics_from_squeue_output, 

44 parse_slurm_job_ID, 

45 subprocess_run_and_raise_exception_on_error, 

46) 

47 

48 

49class ConnectionJobInfo(NamedTuple): 

50 tel_id: Annotated[int, Gt(0)] 

51 hostname: str 

52 port: Annotated[int, Gt(0), Le(65535)] 

53 slurm_reservation: str 

54 slurm_node: str 

55 

56 

57def assign_worker_to_data_connection( 

58 slurm_nodes: Dict[str, List[str]], 

59 tel_ids_to_data_servers: Dict[Annotated[int, Gt(0)], List[DataStreamConnectionConfiguration]], 

60) -> List[ConnectionJobInfo]: 

61 """Assign a worker node from `slurm_nodes` to each data server connection in `tel_ids_to_data_servers` 

62 

63 Parameters 

64 ---------- 

65 slurm_nodes : Dict[str, List[str]] 

66 Mapping from slurm reservation to slurm nodes, see AutoRTAConfiguration.slurm_nodes field. 

67 tel_ids_to_data_servers : Dict[Annotated[int, Gt(0)], List[DataStreamConnectionConfiguration]] 

68 Mapping from telescope ID to data servers connections, see AutoRTAConfiguration.tel_ids_to_data_servers 

69 Returns 

70 ------- 

71 Dict[Tuple[int, str, str], Tuple[str, str]] 

72 Map from (tel_id, hostname, port) to (slurm_reservation, slurm_nodename) 

73 """ 

74 # "Flatten" the slurm nodes information into a list of tuple (reservation, node) 

75 node_list = [(reservation, node) for reservation, nodes in slurm_nodes.items() for node in nodes] 

76 tel_to_node_map = [] 

77 node_idx = 0 

78 # Now make the tuples from telescope connections and available nodes. 

79 for tel_id, tel_data_server_connections in tel_ids_to_data_servers.items(): 

80 for connection_idx, connection in enumerate(tel_data_server_connections): 

81 tel_to_node_map.append( 

82 ConnectionJobInfo( 

83 tel_id=tel_id, 

84 hostname=connection.hostname, 

85 port=connection.port, 

86 slurm_reservation=node_list[node_idx + connection_idx][0], 

87 slurm_node=node_list[node_idx + connection_idx][1], 

88 ) 

89 ) 

90 

91 node_idx += len(tel_data_server_connections) 

92 return tel_to_node_map 

93 

94 

95def srun_cmd_worker_nodes( 

96 connection_jobs_info: List[ConnectionJobInfo], 

97 cmd: str, 

98 additional_slurm_params: List[str] | None = None, 

99 error_level=LOGGING_LEVELS_DICT["CRITICAL"], 

100): 

101 """Submit a slurm command with srun on all worker nodes in `connection_jobs_info` 

102 

103 Parameters 

104 ---------- 

105 connection_jobs_info : List[ConnectionJobInfo] 

106 List of connection job information: `cmd` will be executed for each node entry in this list. 

107 cmd : str 

108 Command to run with srun. 

109 additional_slurm_params : List[str] or None 

110 List of additional slurm jobs parameters, separated between args and values, for instance ["--mem", "20G"] 

111 Optionnal, default is None. 

112 

113 Returns 

114 ------- 

115 List[subprocess.CompletedProcess] 

116 List of completed processes. 

117 """ 

118 completed_processes = [] 

119 for job_info in connection_jobs_info: 

120 srun_cmd = " ".join( 

121 [ 

122 "srun", 

123 *(additional_slurm_params if additional_slurm_params is not None else ""), 

124 "--reservation={}".format(job_info.slurm_reservation), 

125 "--nodelist={}".format(job_info.slurm_node), 

126 cmd, 

127 ] 

128 ) 

129 logging.info("Running {}".format(srun_cmd)) 

130 completed_processes.append( 

131 subprocess_run_and_raise_exception_on_error( 

132 shlex.split(srun_cmd), 

133 success_log_string="Success on node {}".format(job_info.slurm_node), 

134 failure_log_string="Failure on node {} with {}".format(job_info.slurm_node, srun_cmd), 

135 error_level=error_level, 

136 log_level=LOGGING_LEVELS_DICT["DEBUG"], 

137 ) 

138 ) 

139 return completed_processes 

140 

141 

142def scancel_jobs(job_ids: List[int], signal: signal.Signals, delay_s: float = None, ignore_error: bool = False): 

143 """Run `scancel on `job_ids`, sending `signal` after `delay_s` seconds. 

144 

145 Use the --quiet argument of scancel to not raise error if the jobs are already stopped. 

146 

147 Parameters 

148 ---------- 

149 job_ids : List[str] 

150 List of job ids to scancel 

151 signal : signal.Signals 

152 Signal to send with scancel 

153 delay_s : float, optional 

154 Amount of time in second to wait before performing the scancel 

155 ignore_error : bool, optional 

156 If True, the scancel is run directly with subprocess.srun, and any error happening in the subprocess 

157 is simply ignored. This is usefull when running scancel -s KILL after a scancel -s INT: if the SIGINT 

158 stopped the job already, the SIGKILL would have exit code 1 even with --quiet, but we want to ignore 

159 the error in this case. 

160 """ 

161 # note: --full or -f is required for r0_dl1 daemons to receive signal 

162 scancel_cmd = " ".join(["scancel", "--full", "-s", str(signal), *[str(id) for id in job_ids]]) 

163 if delay_s is not None: 

164 time.sleep(delay_s) 

165 if ignore_error: 

166 try: 

167 sp.run(shlex.split(scancel_cmd), capture_output=True, text=True, check=True) 

168 logging.debug("Stopping jobs with {}".format(scancel_cmd)) 

169 except CalledProcessError as error: 

170 logging.info( 

171 "Ignoring error of {} caused by jobs already been stopped. Error info:\nstdout: {}\nstderr: {}".format( 

172 scancel_cmd, error.stdout, error.stderr 

173 ) 

174 ) 

175 else: 

176 subprocess_run_and_raise_exception_on_error( 

177 shlex.split(scancel_cmd), 

178 success_log_string="Stopping job with {}".format(scancel_cmd), 

179 failure_log_string="FAILURE to stop job with {}".format(scancel_cmd), 

180 error_level=LOGGING_LEVELS_DICT["ERROR"], 

181 log_level=LOGGING_LEVELS_DICT["DEBUG"], 

182 ) 

183 

184 

185def stop_rta(slurm_reservations: List[str], slurm_account: str, r0dl1_job_name: str) -> Thread: 

186 """Stop the r0dl1 daemons 

187 

188 The r0dl1 jobs are immediately send a SIGINT signal, which should tell them to 

189 gracefully shut down. A SIGKILL is also scheduled to run 10 seconds later to 

190 ensure the jobs are indeed stopped. 

191 

192 Parameters 

193 ---------- 

194 slurm_reservations : List[str] 

195 List of slurm reservation to search for r0dl1 daemons. 

196 slurm_account : str 

197 Slurm account to use when searching for the r0dl1 daemons. 

198 r0dl1_job_name : str 

199 Name of the r0dl1 jobs in the CDB configuration 

200 

201 Returns 

202 ------- 

203 stop_thread : Thread 

204 Started thread that will SIGKILL the r0dl1 jobs after 10 secs. 

205 """ 

206 job_ids = parse_slurm_job_ID(slurm_reservations, slurm_account, r0dl1_job_name) 

207 logging.info("Found r0dl1 job ids {} to stop.".format(job_ids)) 

208 if job_ids: 

209 # immediately send the SIGINT 

210 scancel_jobs(job_ids, signal.SIGINT, None) 

211 # start a detached thread to send the SIGKILL in 10 seconds. 

212 # this allows autorta to continue and start new run immediately, while jobs are shuting down. 

213 stop_thread = Thread(target=scancel_jobs, args=(job_ids, signal.SIGKILL, 10.0, True)) 

214 stop_thread.start() 

215 return stop_thread 

216 return None 

217 

218 

219def nuke_rta(slurm_account: str): 

220 """Hard stop of All RTA jobs: scancels all jobs of `slurm_account` 

221 

222 Parameters 

223 ---------- 

224 slurm_account : str 

225 slurm account 

226 """ 

227 nuke_rta_cmd = " ".join(["scancel", "-u", slurm_account]) 

228 subprocess_run_and_raise_exception_on_error( 

229 shlex.split(nuke_rta_cmd), 

230 "Stopped RTA with {}".format(nuke_rta_cmd), 

231 "Could not stop RTA with {}".format(nuke_rta_cmd), 

232 error_level=LOGGING_LEVELS_DICT["CRITICAL"], 

233 log_level=LOGGING_LEVELS_DICT["WARNING"], 

234 ) 

235 

236 

237def write_reco_manager_observation_config( 

238 obs_info: ObsInfo, 

239 obs_dir: Path, 

240 night_path_structure: RecoPathStructure, 

241 auto_rta_config: AutoRTAConfiguration, 

242 output_path: Path, 

243): 

244 """Writes the observation configuration for the reco-manager 

245 

246 Parameters 

247 ---------- 

248 obs_info : ObsInfo 

249 Observation parameters from observation DB 

250 obs_dir : Path 

251 Path to the observation data directory 

252 night_path_structure : RecoPathStructure 

253 Path structure of the night 

254 auto_rta_config : AutoRTAConfiguration 

255 Configuration of Auto RTA 

256 output_path: Path 

257 Path where to write the hiperta_stream_start configuration 

258 """ 

259 hiperta_obs_config = ObservationParameters.model_validate( 

260 { 

261 "sb_id": 1, # no scheduling block in LST 

262 "obs_id": obs_info.obs_id, 

263 "tel_id": 1, # only 1 tel 

264 "RA_pointing": obs_info.RA, 

265 "DEC_pointing": obs_info.DEC, 

266 "dl1_dir": str(night_path_structure.dl1_dir(obs_dir)), 

267 "dl2_dir": str(night_path_structure.dl2_dir(obs_dir)), 

268 "dl3_dir": str(night_path_structure.dl3_dir(obs_dir)), 

269 "log_dir": str(night_path_structure.log_dir(obs_dir)), 

270 "reco_manager_log_file": str(night_path_structure.log_dir(obs_dir) / "hiperta_stream_start.log"), 

271 "data_stream_connections": auto_rta_config.tel_ids_to_data_servers[1], # only do tel ID 1 for LST 1 

272 "slurm_nodelists": auto_rta_config.slurm_nodes, 

273 } 

274 ) 

275 with open(output_path, "w") as obs_config_f: 

276 obs_config_f.write(hiperta_obs_config.model_dump_json(indent=4)) 

277 

278 

279def main(): 

280 """ 

281 Entrypoint of Auto_RTA: 

282 - parse the available slurm nodes 

283 - check that the slurm nodes are in "connected" network mode 

284 - copy conda environment to slurm nodes (usually in /dev/shm/ (RAM)) 

285 - query database for current RUN and for each run: 

286 - stops previous RTA reconstruction slurm jobs 

287 - starts new R0-DL1 jobs for the new RUN with static configuration (CDB config) and 

288 a newly written dynamic config 

289 - starts engineering GUI plotting scripts 

290 - stops at a fixed hour set in configuration, after cleaning slurm nodes memory. 

291 """ 

292 

293 start_time = datetime.datetime.now(datetime.UTC) 

294 

295 # initially write log where the script is called (home if cron job) 

296 # so that errors can be logged if we can't parse the config 

297 init_logging(log_level="DEBUG", log_filename="LST_AUTO_RTA.log") 

298 

299 # Load configuration 

300 parser = argparse.ArgumentParser( 

301 description="Automatic Starting of the RTA Reconstruction for an observation night", 

302 formatter_class=argparse.ArgumentDefaultsHelpFormatter, 

303 ) 

304 parser.add_argument( 

305 "-c", "--config", dest="config", type=str, required=True, help="LST auto RTA configuration file." 

306 ) 

307 args = parser.parse_args() 

308 with open(args.config, "r") as config_file: 

309 config = AutoRTAConfiguration.model_validate_json(config_file.read()) 

310 

311 # Create night data directory 

312 path_structure = RecoPathStructure(config.data_dir, start_time) 

313 path_structure.create_night_data_dir() 

314 # now update logging to write log in night's directory 

315 init_logging( 

316 log_level=config.log_level, 

317 log_filename=path_structure.night_data_dir / "LST_AUTO_RTA.log", 

318 ) 

319 

320 # get the stop time during today 

321 stop_time = start_time.replace( 

322 hour=config.stop_time_UTC_hours, minute=config.stop_time_UTC_minutes, second=0, microsecond=0 

323 ) 

324 # If we are passed it, it is actually tomorrow 

325 if stop_time < start_time: 

326 stop_time += datetime.timedelta(days=1) 

327 

328 logging.info("Start RTA at " + str(start_time)) 

329 logging.info("Found worker nodes : {}".format(config.slurm_nodes)) 

330 

331 connection_jobs_info = assign_worker_to_data_connection(config.slurm_nodes, config.tel_ids_to_data_servers) 

332 for job_info in connection_jobs_info: 

333 logging.info( 

334 'Will run tel {} connection {{"hostname": {}, "port": {}}} r0->dl1 job on {} (reservation {})'.format( 

335 job_info.tel_id, job_info.hostname, job_info.port, job_info.slurm_node, job_info.slurm_reservation 

336 ) 

337 ) 

338 

339 logging.info("Reading R0-DL1 jobnames from CDB configuration") 

340 # load CDB configuration, checking it exists, and parsing r0_dl1_job_name 

341 with open(config.hiperta_CDB_config_file, "r") as CDB_f: 

342 CDB_config = json.load(CDB_f) 

343 r0_dl1_job_name = CDB_config["r0_dl1_params"]["r0_dl1_job_name"] 

344 logging.info("Found job name: {}".format(r0_dl1_job_name)) 

345 

346 if config.copy_env: 

347 # For all srun commands here: error level is critical because RTA can not run if environment can not be copied 

348 logging.info("Loading the environment on worker nodes for the night") 

349 logging.info("Cleaning previous content in {}".format(config.env_archive_extraction_path)) 

350 srun_cmd_worker_nodes( 

351 connection_jobs_info, 

352 "rm -rf {}".format(config.env_archive_extraction_path), 

353 error_level=LOGGING_LEVELS_DICT["CRITICAL"], 

354 ) 

355 logging.info("Cleaning previous content in {}".format(config.models_archive_copy_path)) 

356 srun_cmd_worker_nodes( 

357 connection_jobs_info, 

358 "rm -rf {}".format(config.models_archive_copy_path), 

359 error_level=LOGGING_LEVELS_DICT["CRITICAL"], 

360 ) 

361 logging.info( 

362 "Extracting environment from {} to {}".format(config.env_archive, config.env_archive_extraction_path) 

363 ) 

364 srun_cmd_worker_nodes( 

365 connection_jobs_info, 

366 "bash -c 'mkdir {} && tar -xzf {} -C {}'".format( 

367 config.env_archive_extraction_path, config.env_archive, config.env_archive_extraction_path 

368 ), 

369 ["--mem", "20G"], 

370 error_level=LOGGING_LEVELS_DICT["CRITICAL"], 

371 ) 

372 logging.info( 

373 "Copying reconstruction models from {} to {}".format( 

374 config.models_archive_path, config.models_archive_copy_path 

375 ) 

376 ) 

377 srun_cmd_worker_nodes( 

378 connection_jobs_info, 

379 "cp -rf {} {}".format(config.models_archive_path, config.models_archive_copy_path), 

380 ["--mem", "20G"], 

381 error_level=LOGGING_LEVELS_DICT["CRITICAL"], 

382 ) 

383 

384 if config.check_node_connection: 

385 comp_proc_ib0_cat = srun_cmd_worker_nodes(connection_jobs_info, "cat /sys/class/net/ib0/mode") 

386 # If a node is not connected, raise error 

387 if not all("connected" in process.stdout.strip() for process in comp_proc_ib0_cat): 

388 raise RuntimeError( 

389 "Not all RTA slurm nodes are connected to ib0! Found {}".format( 

390 ", ".join( 

391 [ 

392 "{}: {}".format(job_info.slurm_node, process.stdout.strip()) 

393 for job_info, process in zip(connection_jobs_info, comp_proc_ib0_cat) 

394 ] 

395 ) 

396 ) 

397 ) 

398 

399 logging.info("RTA ready for the night !") 

400 

401 current_obs_info = ObsInfo(None, None, None, None, None, None) 

402 while not (datetime.datetime.now(datetime.UTC) > stop_time): 

403 # query TCU DB for observation 

404 try: 

405 obs_info = get_current_run_info(config.db_hostname, 10) 

406 except PyMongoError: 

407 logging.error("Error retrieving observation information from DB, ignoring...", exc_info=True) 

408 obs_info = ObsInfo(None, None, None, None, None, None) 

409 

410 # Check observation data 

411 obs_info_is_none = obs_info.RA is None or obs_info.DEC is None 

412 obs_recent_enough = True # default value if we couldn't get an obs_info 

413 if obs_info_is_none: 

414 logging.warning("Queried observation information had no RA DEC.") 

415 elif config.ignore_old_observation: 

416 obs_info_tstart_datetime = datetime.datetime.fromtimestamp(obs_info.time_start_camera, datetime.UTC) 

417 obs_info_time_delta = datetime.datetime.now(datetime.UTC) - obs_info_tstart_datetime 

418 obs_recent_enough = obs_info_time_delta < datetime.timedelta(hours=4) 

419 if not obs_recent_enough: 

420 logging.info( 

421 "Queried observation {} has start time {}, timedelta wrt now: {}. Too old to start RTA".format( 

422 obs_info.obs_id, obs_info_tstart_datetime, obs_info_time_delta 

423 ) 

424 ) 

425 

426 # Start RTA 

427 if current_obs_info.obs_id != obs_info.obs_id and (not obs_info_is_none) and obs_recent_enough: 

428 # We got a new observation! 

429 logging.info( 

430 "Got new observation! ID: {} - RA: {} - DEC: {} - SOURCE.RA: {} - SOURCE.DEC: {}".format( 

431 obs_info.obs_id, obs_info.RA, obs_info.DEC, obs_info.source_RA, obs_info.source_DEC 

432 ) 

433 ) 

434 

435 try: 

436 logging.info("Stopping RTA") 

437 stop_rta(config.slurm_reservations, config.slurm_account, r0_dl1_job_name) 

438 except Exception: 

439 logging.error("Could not stop RTA. NEXT RUN DATA MAY BE ACQUIRED BY PREVIOUS RUN DAEMONS !") 

440 # Note: we could nuke_rta, but it would also kill DQ, SCI jobs, etc... 

441 

442 obs_dir = path_structure.create_observation_data_dirs(str(obs_info.obs_id), True) 

443 logging.info("Created directories for obs {} at {}".format(obs_info.obs_id, obs_dir)) 

444 

445 reco_manager_obs_config_path = ( 

446 path_structure.log_dir(obs_dir) / "hiperta_stream_start_observation_config.json" 

447 ) 

448 logging.info("Writing reco-manager observation configuration at {}".format(reco_manager_obs_config_path)) 

449 write_reco_manager_observation_config( 

450 obs_info, 

451 obs_dir, 

452 path_structure, 

453 config, 

454 reco_manager_obs_config_path, 

455 ) 

456 

457 # Note: hiperta_stream_start has to be started on a worker node as well, because it 

458 # will read the training r0dl1 configuration, which is in the model's archive copied to /dev/shm 

459 # (it reads the r0dl1 configuration from the path set in the CDB config, that must point to /dev/shm) 

460 # Note2: to run several commands with srun, need to wrap with bash -c '...' 

461 hiperta_stream_start_cmd = " ".join( 

462 [ 

463 "bash -c '" "export PATH={}/bin/:$PATH ; {}/bin/hiperta_stream_start".format( 

464 config.env_archive_extraction_path, config.env_archive_extraction_path 

465 ), 

466 "-c", 

467 config.hiperta_CDB_config_file, 

468 "-d", 

469 str(reco_manager_obs_config_path), 

470 "'", 

471 ] 

472 ) 

473 try: 

474 # use 1st node to start hiperta_stream 

475 # Note: hiperta_stream must run on a worker node as well, because it needs access to 

476 # the r0dl1 training configuration, which is found form the path in the CDB configuration, 

477 # which points to /dev/shm/model_archive/... 

478 hiperta_stream_job_info = connection_jobs_info[0] 

479 hiperta_stream_srun_cmd = " ".join( 

480 [ 

481 "srun", 

482 "--reservation={}".format(hiperta_stream_job_info.slurm_reservation), 

483 "--nodelist={}".format(hiperta_stream_job_info.slurm_node), 

484 hiperta_stream_start_cmd, 

485 ] 

486 ) 

487 logging.info("Starting hiperta_stream_start with {}".format(hiperta_stream_srun_cmd)) 

488 subprocess_run_and_raise_exception_on_error( 

489 shlex.split(hiperta_stream_srun_cmd), 

490 success_log_string="hiperta_stream started with {}".format(hiperta_stream_job_info.slurm_node), 

491 failure_log_string="Failed to start hiperta_stream on {} with {}".format( 

492 hiperta_stream_job_info.slurm_node, hiperta_stream_start_cmd 

493 ), 

494 error_level=LOGGING_LEVELS_DICT[ 

495 "ERROR" 

496 ], # we might want to continue even if we can't start reco-manager 

497 log_level=LOGGING_LEVELS_DICT["DEBUG"], 

498 ) 

499 # Only if we could start RTA we update the current obs_info 

500 # Otherwise, we will loop, see a new observation again, stop RTA and re-start, etc ... 

501 current_obs_info = obs_info 

502 except sp.SubprocessError: 

503 logging.error("Failed to start reco-manager with {}".format(hiperta_stream_start_cmd)) 

504 pass # continue anyway, obs_info is not updated so we will try again to start 

505 

506 # If not starting RTA: Query squeue for some statistics on the running jobs 

507 else: 

508 try: 

509 running_jobs_info = subprocess_run_and_raise_exception_on_error( 

510 shlex.split('squeue -u {} --format="%T,%R"'.format(config.slurm_account)), 

511 failure_log_string="Could not parse slurm info while waiting for next observation", 

512 error_level=LOGGING_LEVELS_DICT["ERROR"], 

513 log_level=LOGGING_LEVELS_DICT["DEBUG"], 

514 ).stdout 

515 n_jobs, n_running, n_pending, n_request_node_not_available = job_statistics_from_squeue_output( 

516 running_jobs_info 

517 ) 

518 logging.info( 

519 "{} job statistics: nb_jobs: {} - nb_running: {} - nb_pending: {} - nb_node_not_available: {}".format( 

520 config.slurm_account, n_jobs, n_running, n_pending, n_request_node_not_available 

521 ) 

522 ) 

523 except sp.SubprocessError: 

524 pass # continue loop anyway 

525 

526 # sleep until we get an observation 

527 logging.info("RTA waiting for next observation, current observation is {}".format(obs_info.obs_id)) 

528 time.sleep(1.0) 

529 

530 logging.info("End of the night: Stop the RTA") 

531 try: 

532 logging.info("Stopping RTA") 

533 sigkill_thread = stop_rta(config.slurm_reservations, config.slurm_account, r0_dl1_job_name) 

534 if sigkill_thread is not None: # it is None if there are no jobs to kill 

535 sigkill_thread.join(timeout=180.0) # 3 minutes timeout but job should end right after delay of 10 sec 

536 else: 

537 logging.warning("Found no r0_dl1 jobs to stop !") 

538 except Exception: 

539 logging.error("Could not stop RTA normally, nuking all {} jobs".format(config.slurm_account)) 

540 nuke_rta(config.slurm_account) 

541 

542 # Clean up node environment 

543 logging.info("Cleaning nodes copied files") 

544 logging.info("Cleaning content in {}".format(config.env_archive_extraction_path)) 

545 srun_cmd_worker_nodes( 

546 connection_jobs_info, 

547 "rm -rf {}".format(config.env_archive_extraction_path), 

548 error_level=LOGGING_LEVELS_DICT["CRITICAL"], 

549 ) 

550 logging.info("Cleaning content in {}".format(config.models_archive_copy_path)) 

551 srun_cmd_worker_nodes( 

552 connection_jobs_info, 

553 "rm -rf {}".format(config.models_archive_copy_path), 

554 error_level=LOGGING_LEVELS_DICT["CRITICAL"], 

555 ) 

556 logging.info("Done cleaning nodes.") 

557 

558 logging.info("RTA Done for the night. Good day !") 

559 

560 

561if __name__ == "__main__": 561 ↛ 562line 561 didn't jump to line 562 because the condition on line 561 was never true

562 main()