Coverage for lst_auto_rta/Auto_RTA.py: 24%
174 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-03 14:47 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-03 14:47 +0000
1#!/usr/bin/env python
3"""Automatically start/stop RTA reconstruction pipeline for new runs during a observation night
5New runs are found querying the TCU pymongo database regularly.
6This script:
7- copies the conda environment and reconstruction model to the RAM of the slurm nodes
8- queries the TCU pymongo database for new runs regularly and for each new run:
9 - stops the r0->dl1 daemons for the previous runs
10 - starts the r0->dl1 daemons for the new run, using a static configuration (CDB configuration)
11 from disk, and writing the dynamic configuration to disk as well.
12 - starts the "engineering gui" scripts allowing to monitor data processing for the run.
13- stops at a fixed hour according to its configuration file
14- cleans the RAM of slurm worker nodes
15"""
17import argparse
18import datetime
19import json
20import logging
21import shlex
22import signal
23import subprocess as sp
24import time
25from pathlib import Path
26from subprocess import CalledProcessError
27from threading import Thread
28from typing import Dict, List, NamedTuple
30from annotated_types import Gt, Le
31from pymongo.errors import PyMongoError
32from typing_extensions import Annotated
34from lst_auto_rta.config.configuration import (
35 AutoRTAConfiguration,
36 DataStreamConnectionConfiguration,
37 ObservationParameters,
38)
39from lst_auto_rta.observation_data import ObsInfo, get_current_run_info
40from lst_auto_rta.paths import RecoPathStructure
41from lst_auto_rta.utils.logging import LOGGING_LEVELS_DICT, init_logging
42from lst_auto_rta.utils.slurm import (
43 job_statistics_from_squeue_output,
44 parse_slurm_job_ID,
45 subprocess_run_and_raise_exception_on_error,
46)
49class ConnectionJobInfo(NamedTuple):
50 tel_id: Annotated[int, Gt(0)]
51 hostname: str
52 port: Annotated[int, Gt(0), Le(65535)]
53 slurm_reservation: str
54 slurm_node: str
57def assign_worker_to_data_connection(
58 slurm_nodes: Dict[str, List[str]],
59 tel_ids_to_data_servers: Dict[Annotated[int, Gt(0)], List[DataStreamConnectionConfiguration]],
60) -> List[ConnectionJobInfo]:
61 """Assign a worker node from `slurm_nodes` to each data server connection in `tel_ids_to_data_servers`
63 Parameters
64 ----------
65 slurm_nodes : Dict[str, List[str]]
66 Mapping from slurm reservation to slurm nodes, see AutoRTAConfiguration.slurm_nodes field.
67 tel_ids_to_data_servers : Dict[Annotated[int, Gt(0)], List[DataStreamConnectionConfiguration]]
68 Mapping from telescope ID to data servers connections, see AutoRTAConfiguration.tel_ids_to_data_servers
69 Returns
70 -------
71 Dict[Tuple[int, str, str], Tuple[str, str]]
72 Map from (tel_id, hostname, port) to (slurm_reservation, slurm_nodename)
73 """
74 # "Flatten" the slurm nodes information into a list of tuple (reservation, node)
75 node_list = [(reservation, node) for reservation, nodes in slurm_nodes.items() for node in nodes]
76 tel_to_node_map = []
77 node_idx = 0
78 # Now make the tuples from telescope connections and available nodes.
79 for tel_id, tel_data_server_connections in tel_ids_to_data_servers.items():
80 for connection_idx, connection in enumerate(tel_data_server_connections):
81 tel_to_node_map.append(
82 ConnectionJobInfo(
83 tel_id=tel_id,
84 hostname=connection.hostname,
85 port=connection.port,
86 slurm_reservation=node_list[node_idx + connection_idx][0],
87 slurm_node=node_list[node_idx + connection_idx][1],
88 )
89 )
91 node_idx += len(tel_data_server_connections)
92 return tel_to_node_map
95def srun_cmd_worker_nodes(
96 connection_jobs_info: List[ConnectionJobInfo],
97 cmd: str,
98 additional_slurm_params: List[str] | None = None,
99 error_level=LOGGING_LEVELS_DICT["CRITICAL"],
100):
101 """Submit a slurm command with srun on all worker nodes in `connection_jobs_info`
103 Parameters
104 ----------
105 connection_jobs_info : List[ConnectionJobInfo]
106 List of connection job information: `cmd` will be executed for each node entry in this list.
107 cmd : str
108 Command to run with srun.
109 additional_slurm_params : List[str] or None
110 List of additional slurm jobs parameters, separated between args and values, for instance ["--mem", "20G"]
111 Optionnal, default is None.
113 Returns
114 -------
115 List[subprocess.CompletedProcess]
116 List of completed processes.
117 """
118 completed_processes = []
119 for job_info in connection_jobs_info:
120 srun_cmd = " ".join(
121 [
122 "srun",
123 *(additional_slurm_params if additional_slurm_params is not None else ""),
124 "--reservation={}".format(job_info.slurm_reservation),
125 "--nodelist={}".format(job_info.slurm_node),
126 cmd,
127 ]
128 )
129 logging.info("Running {}".format(srun_cmd))
130 completed_processes.append(
131 subprocess_run_and_raise_exception_on_error(
132 shlex.split(srun_cmd),
133 success_log_string="Success on node {}".format(job_info.slurm_node),
134 failure_log_string="Failure on node {} with {}".format(job_info.slurm_node, srun_cmd),
135 error_level=error_level,
136 log_level=LOGGING_LEVELS_DICT["DEBUG"],
137 )
138 )
139 return completed_processes
142def scancel_jobs(job_ids: List[int], signal: signal.Signals, delay_s: float = None, ignore_error: bool = False):
143 """Run `scancel on `job_ids`, sending `signal` after `delay_s` seconds.
145 Use the --quiet argument of scancel to not raise error if the jobs are already stopped.
147 Parameters
148 ----------
149 job_ids : List[str]
150 List of job ids to scancel
151 signal : signal.Signals
152 Signal to send with scancel
153 delay_s : float, optional
154 Amount of time in second to wait before performing the scancel
155 ignore_error : bool, optional
156 If True, the scancel is run directly with subprocess.srun, and any error happening in the subprocess
157 is simply ignored. This is usefull when running scancel -s KILL after a scancel -s INT: if the SIGINT
158 stopped the job already, the SIGKILL would have exit code 1 even with --quiet, but we want to ignore
159 the error in this case.
160 """
161 # note: --full or -f is required for r0_dl1 daemons to receive signal
162 scancel_cmd = " ".join(["scancel", "--full", "-s", str(signal), *[str(id) for id in job_ids]])
163 if delay_s is not None:
164 time.sleep(delay_s)
165 if ignore_error:
166 try:
167 sp.run(shlex.split(scancel_cmd), capture_output=True, text=True, check=True)
168 logging.debug("Stopping jobs with {}".format(scancel_cmd))
169 except CalledProcessError as error:
170 logging.info(
171 "Ignoring error of {} caused by jobs already been stopped. Error info:\nstdout: {}\nstderr: {}".format(
172 scancel_cmd, error.stdout, error.stderr
173 )
174 )
175 else:
176 subprocess_run_and_raise_exception_on_error(
177 shlex.split(scancel_cmd),
178 success_log_string="Stopping job with {}".format(scancel_cmd),
179 failure_log_string="FAILURE to stop job with {}".format(scancel_cmd),
180 error_level=LOGGING_LEVELS_DICT["ERROR"],
181 log_level=LOGGING_LEVELS_DICT["DEBUG"],
182 )
185def stop_rta(slurm_reservations: List[str], slurm_account: str, r0dl1_job_name: str) -> Thread:
186 """Stop the r0dl1 daemons
188 The r0dl1 jobs are immediately send a SIGINT signal, which should tell them to
189 gracefully shut down. A SIGKILL is also scheduled to run 10 seconds later to
190 ensure the jobs are indeed stopped.
192 Parameters
193 ----------
194 slurm_reservations : List[str]
195 List of slurm reservation to search for r0dl1 daemons.
196 slurm_account : str
197 Slurm account to use when searching for the r0dl1 daemons.
198 r0dl1_job_name : str
199 Name of the r0dl1 jobs in the CDB configuration
201 Returns
202 -------
203 stop_thread : Thread
204 Started thread that will SIGKILL the r0dl1 jobs after 10 secs.
205 """
206 job_ids = parse_slurm_job_ID(slurm_reservations, slurm_account, r0dl1_job_name)
207 logging.info("Found r0dl1 job ids {} to stop.".format(job_ids))
208 if job_ids:
209 # immediately send the SIGINT
210 scancel_jobs(job_ids, signal.SIGINT, None)
211 # start a detached thread to send the SIGKILL in 10 seconds.
212 # this allows autorta to continue and start new run immediately, while jobs are shuting down.
213 stop_thread = Thread(target=scancel_jobs, args=(job_ids, signal.SIGKILL, 10.0, True))
214 stop_thread.start()
215 return stop_thread
216 return None
219def nuke_rta(slurm_account: str):
220 """Hard stop of All RTA jobs: scancels all jobs of `slurm_account`
222 Parameters
223 ----------
224 slurm_account : str
225 slurm account
226 """
227 nuke_rta_cmd = " ".join(["scancel", "-u", slurm_account])
228 subprocess_run_and_raise_exception_on_error(
229 shlex.split(nuke_rta_cmd),
230 "Stopped RTA with {}".format(nuke_rta_cmd),
231 "Could not stop RTA with {}".format(nuke_rta_cmd),
232 error_level=LOGGING_LEVELS_DICT["CRITICAL"],
233 log_level=LOGGING_LEVELS_DICT["WARNING"],
234 )
237def write_reco_manager_observation_config(
238 obs_info: ObsInfo,
239 obs_dir: Path,
240 night_path_structure: RecoPathStructure,
241 auto_rta_config: AutoRTAConfiguration,
242 output_path: Path,
243):
244 """Writes the observation configuration for the reco-manager
246 Parameters
247 ----------
248 obs_info : ObsInfo
249 Observation parameters from observation DB
250 obs_dir : Path
251 Path to the observation data directory
252 night_path_structure : RecoPathStructure
253 Path structure of the night
254 auto_rta_config : AutoRTAConfiguration
255 Configuration of Auto RTA
256 output_path: Path
257 Path where to write the hiperta_stream_start configuration
258 """
259 hiperta_obs_config = ObservationParameters.model_validate(
260 {
261 "sb_id": 1, # no scheduling block in LST
262 "obs_id": obs_info.obs_id,
263 "tel_id": 1, # only 1 tel
264 "RA_pointing": obs_info.RA,
265 "DEC_pointing": obs_info.DEC,
266 "dl1_dir": str(night_path_structure.dl1_dir(obs_dir)),
267 "dl2_dir": str(night_path_structure.dl2_dir(obs_dir)),
268 "dl3_dir": str(night_path_structure.dl3_dir(obs_dir)),
269 "log_dir": str(night_path_structure.log_dir(obs_dir)),
270 "reco_manager_log_file": str(night_path_structure.log_dir(obs_dir) / "hiperta_stream_start.log"),
271 "data_stream_connections": auto_rta_config.tel_ids_to_data_servers[1], # only do tel ID 1 for LST 1
272 "slurm_nodelists": auto_rta_config.slurm_nodes,
273 }
274 )
275 with open(output_path, "w") as obs_config_f:
276 obs_config_f.write(hiperta_obs_config.model_dump_json(indent=4))
279def main():
280 """
281 Entrypoint of Auto_RTA:
282 - parse the available slurm nodes
283 - check that the slurm nodes are in "connected" network mode
284 - copy conda environment to slurm nodes (usually in /dev/shm/ (RAM))
285 - query database for current RUN and for each run:
286 - stops previous RTA reconstruction slurm jobs
287 - starts new R0-DL1 jobs for the new RUN with static configuration (CDB config) and
288 a newly written dynamic config
289 - starts engineering GUI plotting scripts
290 - stops at a fixed hour set in configuration, after cleaning slurm nodes memory.
291 """
293 start_time = datetime.datetime.now(datetime.UTC)
295 # initially write log where the script is called (home if cron job)
296 # so that errors can be logged if we can't parse the config
297 init_logging(log_level="DEBUG", log_filename="LST_AUTO_RTA.log")
299 # Load configuration
300 parser = argparse.ArgumentParser(
301 description="Automatic Starting of the RTA Reconstruction for an observation night",
302 formatter_class=argparse.ArgumentDefaultsHelpFormatter,
303 )
304 parser.add_argument(
305 "-c", "--config", dest="config", type=str, required=True, help="LST auto RTA configuration file."
306 )
307 args = parser.parse_args()
308 with open(args.config, "r") as config_file:
309 config = AutoRTAConfiguration.model_validate_json(config_file.read())
311 # Create night data directory
312 path_structure = RecoPathStructure(config.data_dir, start_time)
313 path_structure.create_night_data_dir()
314 # now update logging to write log in night's directory
315 init_logging(
316 log_level=config.log_level,
317 log_filename=path_structure.night_data_dir / "LST_AUTO_RTA.log",
318 )
320 # get the stop time during today
321 stop_time = start_time.replace(
322 hour=config.stop_time_UTC_hours, minute=config.stop_time_UTC_minutes, second=0, microsecond=0
323 )
324 # If we are passed it, it is actually tomorrow
325 if stop_time < start_time:
326 stop_time += datetime.timedelta(days=1)
328 logging.info("Start RTA at " + str(start_time))
329 logging.info("Found worker nodes : {}".format(config.slurm_nodes))
331 connection_jobs_info = assign_worker_to_data_connection(config.slurm_nodes, config.tel_ids_to_data_servers)
332 for job_info in connection_jobs_info:
333 logging.info(
334 'Will run tel {} connection {{"hostname": {}, "port": {}}} r0->dl1 job on {} (reservation {})'.format(
335 job_info.tel_id, job_info.hostname, job_info.port, job_info.slurm_node, job_info.slurm_reservation
336 )
337 )
339 logging.info("Reading R0-DL1 jobnames from CDB configuration")
340 # load CDB configuration, checking it exists, and parsing r0_dl1_job_name
341 with open(config.hiperta_CDB_config_file, "r") as CDB_f:
342 CDB_config = json.load(CDB_f)
343 r0_dl1_job_name = CDB_config["r0_dl1_params"]["r0_dl1_job_name"]
344 logging.info("Found job name: {}".format(r0_dl1_job_name))
346 if config.copy_env:
347 # For all srun commands here: error level is critical because RTA can not run if environment can not be copied
348 logging.info("Loading the environment on worker nodes for the night")
349 logging.info("Cleaning previous content in {}".format(config.env_archive_extraction_path))
350 srun_cmd_worker_nodes(
351 connection_jobs_info,
352 "rm -rf {}".format(config.env_archive_extraction_path),
353 error_level=LOGGING_LEVELS_DICT["CRITICAL"],
354 )
355 logging.info("Cleaning previous content in {}".format(config.models_archive_copy_path))
356 srun_cmd_worker_nodes(
357 connection_jobs_info,
358 "rm -rf {}".format(config.models_archive_copy_path),
359 error_level=LOGGING_LEVELS_DICT["CRITICAL"],
360 )
361 logging.info(
362 "Extracting environment from {} to {}".format(config.env_archive, config.env_archive_extraction_path)
363 )
364 srun_cmd_worker_nodes(
365 connection_jobs_info,
366 "bash -c 'mkdir {} && tar -xzf {} -C {}'".format(
367 config.env_archive_extraction_path, config.env_archive, config.env_archive_extraction_path
368 ),
369 ["--mem", "20G"],
370 error_level=LOGGING_LEVELS_DICT["CRITICAL"],
371 )
372 logging.info(
373 "Copying reconstruction models from {} to {}".format(
374 config.models_archive_path, config.models_archive_copy_path
375 )
376 )
377 srun_cmd_worker_nodes(
378 connection_jobs_info,
379 "cp -rf {} {}".format(config.models_archive_path, config.models_archive_copy_path),
380 ["--mem", "20G"],
381 error_level=LOGGING_LEVELS_DICT["CRITICAL"],
382 )
384 if config.check_node_connection:
385 comp_proc_ib0_cat = srun_cmd_worker_nodes(connection_jobs_info, "cat /sys/class/net/ib0/mode")
386 # If a node is not connected, raise error
387 if not all("connected" in process.stdout.strip() for process in comp_proc_ib0_cat):
388 raise RuntimeError(
389 "Not all RTA slurm nodes are connected to ib0! Found {}".format(
390 ", ".join(
391 [
392 "{}: {}".format(job_info.slurm_node, process.stdout.strip())
393 for job_info, process in zip(connection_jobs_info, comp_proc_ib0_cat)
394 ]
395 )
396 )
397 )
399 logging.info("RTA ready for the night !")
401 current_obs_info = ObsInfo(None, None, None, None, None, None)
402 while not (datetime.datetime.now(datetime.UTC) > stop_time):
403 # query TCU DB for observation
404 try:
405 obs_info = get_current_run_info(config.db_hostname, 10)
406 except PyMongoError:
407 logging.error("Error retrieving observation information from DB, ignoring...", exc_info=True)
408 obs_info = ObsInfo(None, None, None, None, None, None)
410 # Check observation data
411 obs_info_is_none = obs_info.RA is None or obs_info.DEC is None
412 obs_recent_enough = True # default value if we couldn't get an obs_info
413 if obs_info_is_none:
414 logging.warning("Queried observation information had no RA DEC.")
415 elif config.ignore_old_observation:
416 obs_info_tstart_datetime = datetime.datetime.fromtimestamp(obs_info.time_start_camera, datetime.UTC)
417 obs_info_time_delta = datetime.datetime.now(datetime.UTC) - obs_info_tstart_datetime
418 obs_recent_enough = obs_info_time_delta < datetime.timedelta(hours=4)
419 if not obs_recent_enough:
420 logging.info(
421 "Queried observation {} has start time {}, timedelta wrt now: {}. Too old to start RTA".format(
422 obs_info.obs_id, obs_info_tstart_datetime, obs_info_time_delta
423 )
424 )
426 # Start RTA
427 if current_obs_info.obs_id != obs_info.obs_id and (not obs_info_is_none) and obs_recent_enough:
428 # We got a new observation!
429 logging.info(
430 "Got new observation! ID: {} - RA: {} - DEC: {} - SOURCE.RA: {} - SOURCE.DEC: {}".format(
431 obs_info.obs_id, obs_info.RA, obs_info.DEC, obs_info.source_RA, obs_info.source_DEC
432 )
433 )
435 try:
436 logging.info("Stopping RTA")
437 stop_rta(config.slurm_reservations, config.slurm_account, r0_dl1_job_name)
438 except Exception:
439 logging.error("Could not stop RTA. NEXT RUN DATA MAY BE ACQUIRED BY PREVIOUS RUN DAEMONS !")
440 # Note: we could nuke_rta, but it would also kill DQ, SCI jobs, etc...
442 obs_dir = path_structure.create_observation_data_dirs(str(obs_info.obs_id), True)
443 logging.info("Created directories for obs {} at {}".format(obs_info.obs_id, obs_dir))
445 reco_manager_obs_config_path = (
446 path_structure.log_dir(obs_dir) / "hiperta_stream_start_observation_config.json"
447 )
448 logging.info("Writing reco-manager observation configuration at {}".format(reco_manager_obs_config_path))
449 write_reco_manager_observation_config(
450 obs_info,
451 obs_dir,
452 path_structure,
453 config,
454 reco_manager_obs_config_path,
455 )
457 # Note: hiperta_stream_start has to be started on a worker node as well, because it
458 # will read the training r0dl1 configuration, which is in the model's archive copied to /dev/shm
459 # (it reads the r0dl1 configuration from the path set in the CDB config, that must point to /dev/shm)
460 # Note2: to run several commands with srun, need to wrap with bash -c '...'
461 hiperta_stream_start_cmd = " ".join(
462 [
463 "bash -c '" "export PATH={}/bin/:$PATH ; {}/bin/hiperta_stream_start".format(
464 config.env_archive_extraction_path, config.env_archive_extraction_path
465 ),
466 "-c",
467 config.hiperta_CDB_config_file,
468 "-d",
469 str(reco_manager_obs_config_path),
470 "'",
471 ]
472 )
473 try:
474 # use 1st node to start hiperta_stream
475 # Note: hiperta_stream must run on a worker node as well, because it needs access to
476 # the r0dl1 training configuration, which is found form the path in the CDB configuration,
477 # which points to /dev/shm/model_archive/...
478 hiperta_stream_job_info = connection_jobs_info[0]
479 hiperta_stream_srun_cmd = " ".join(
480 [
481 "srun",
482 "--reservation={}".format(hiperta_stream_job_info.slurm_reservation),
483 "--nodelist={}".format(hiperta_stream_job_info.slurm_node),
484 hiperta_stream_start_cmd,
485 ]
486 )
487 logging.info("Starting hiperta_stream_start with {}".format(hiperta_stream_srun_cmd))
488 subprocess_run_and_raise_exception_on_error(
489 shlex.split(hiperta_stream_srun_cmd),
490 success_log_string="hiperta_stream started with {}".format(hiperta_stream_job_info.slurm_node),
491 failure_log_string="Failed to start hiperta_stream on {} with {}".format(
492 hiperta_stream_job_info.slurm_node, hiperta_stream_start_cmd
493 ),
494 error_level=LOGGING_LEVELS_DICT[
495 "ERROR"
496 ], # we might want to continue even if we can't start reco-manager
497 log_level=LOGGING_LEVELS_DICT["DEBUG"],
498 )
499 # Only if we could start RTA we update the current obs_info
500 # Otherwise, we will loop, see a new observation again, stop RTA and re-start, etc ...
501 current_obs_info = obs_info
502 except sp.SubprocessError:
503 logging.error("Failed to start reco-manager with {}".format(hiperta_stream_start_cmd))
504 pass # continue anyway, obs_info is not updated so we will try again to start
506 # If not starting RTA: Query squeue for some statistics on the running jobs
507 else:
508 try:
509 running_jobs_info = subprocess_run_and_raise_exception_on_error(
510 shlex.split('squeue -u {} --format="%T,%R"'.format(config.slurm_account)),
511 failure_log_string="Could not parse slurm info while waiting for next observation",
512 error_level=LOGGING_LEVELS_DICT["ERROR"],
513 log_level=LOGGING_LEVELS_DICT["DEBUG"],
514 ).stdout
515 n_jobs, n_running, n_pending, n_request_node_not_available = job_statistics_from_squeue_output(
516 running_jobs_info
517 )
518 logging.info(
519 "{} job statistics: nb_jobs: {} - nb_running: {} - nb_pending: {} - nb_node_not_available: {}".format(
520 config.slurm_account, n_jobs, n_running, n_pending, n_request_node_not_available
521 )
522 )
523 except sp.SubprocessError:
524 pass # continue loop anyway
526 # sleep until we get an observation
527 logging.info("RTA waiting for next observation, current observation is {}".format(obs_info.obs_id))
528 time.sleep(1.0)
530 logging.info("End of the night: Stop the RTA")
531 try:
532 logging.info("Stopping RTA")
533 sigkill_thread = stop_rta(config.slurm_reservations, config.slurm_account, r0_dl1_job_name)
534 if sigkill_thread is not None: # it is None if there are no jobs to kill
535 sigkill_thread.join(timeout=180.0) # 3 minutes timeout but job should end right after delay of 10 sec
536 else:
537 logging.warning("Found no r0_dl1 jobs to stop !")
538 except Exception:
539 logging.error("Could not stop RTA normally, nuking all {} jobs".format(config.slurm_account))
540 nuke_rta(config.slurm_account)
542 # Clean up node environment
543 logging.info("Cleaning nodes copied files")
544 logging.info("Cleaning content in {}".format(config.env_archive_extraction_path))
545 srun_cmd_worker_nodes(
546 connection_jobs_info,
547 "rm -rf {}".format(config.env_archive_extraction_path),
548 error_level=LOGGING_LEVELS_DICT["CRITICAL"],
549 )
550 logging.info("Cleaning content in {}".format(config.models_archive_copy_path))
551 srun_cmd_worker_nodes(
552 connection_jobs_info,
553 "rm -rf {}".format(config.models_archive_copy_path),
554 error_level=LOGGING_LEVELS_DICT["CRITICAL"],
555 )
556 logging.info("Done cleaning nodes.")
558 logging.info("RTA Done for the night. Good day !")
561if __name__ == "__main__": 561 ↛ 562line 561 didn't jump to line 562 because the condition on line 561 was never true
562 main()