#!/usr/bin/env python
"""Automatically start/stop RTA reconstruction pipeline for new runs during a observation night
New runs are found querying the TCU pymongo database regularly.
This script:
- copies the conda environment and reconstruction model to the RAM of the slurm nodes
- queries the TCU pymongo database for new runs regularly and for each new run:
- stops the r0->dl1 daemons for the previous runs
- starts the r0->dl1 daemons for the new run, using a static configuration (CDB configuration)
from disk, and writing the dynamic configuration to disk as well.
- starts the "engineering gui" scripts allowing to monitor data processing for the run.
- stops at a fixed hour according to its configuration file
- cleans the RAM of slurm worker nodes
"""
import argparse
import datetime
import json
import logging
import shlex
import signal
import subprocess as sp
import time
from pathlib import Path
from subprocess import CalledProcessError
from threading import Thread
from typing import Dict, List, NamedTuple
from annotated_types import Gt, Le
from pymongo.errors import PyMongoError
from typing_extensions import Annotated
from lst_auto_rta.config.configuration import (
AutoRTAConfiguration,
DataStreamConnectionConfiguration,
ObservationParameters,
)
from lst_auto_rta.observation_data import ObsInfo, get_current_run_info
from lst_auto_rta.paths import RecoPathStructure
from lst_auto_rta.utils.logging import LOGGING_LEVELS_DICT, init_logging
from lst_auto_rta.utils.slurm import (
job_statistics_from_squeue_output,
parse_slurm_job_ID,
subprocess_run_and_raise_exception_on_error,
)
[docs]
class ConnectionJobInfo(NamedTuple):
tel_id: Annotated[int, Gt(0)]
hostname: str
port: Annotated[int, Gt(0), Le(65535)]
slurm_reservation: str
slurm_node: str
[docs]
def assign_worker_to_data_connection(
slurm_nodes: Dict[str, List[str]],
tel_ids_to_data_servers: Dict[Annotated[int, Gt(0)], List[DataStreamConnectionConfiguration]],
) -> List[ConnectionJobInfo]:
"""Assign a worker node from `slurm_nodes` to each data server connection in `tel_ids_to_data_servers`
Parameters
----------
slurm_nodes : Dict[str, List[str]]
Mapping from slurm reservation to slurm nodes, see AutoRTAConfiguration.slurm_nodes field.
tel_ids_to_data_servers : Dict[Annotated[int, Gt(0)], List[DataStreamConnectionConfiguration]]
Mapping from telescope ID to data servers connections, see AutoRTAConfiguration.tel_ids_to_data_servers
Returns
-------
Dict[Tuple[int, str, str], Tuple[str, str]]
Map from (tel_id, hostname, port) to (slurm_reservation, slurm_nodename)
"""
# "Flatten" the slurm nodes information into a list of tuple (reservation, node)
node_list = [(reservation, node) for reservation, nodes in slurm_nodes.items() for node in nodes]
tel_to_node_map = []
node_idx = 0
# Now make the tuples from telescope connections and available nodes.
for tel_id, tel_data_server_connections in tel_ids_to_data_servers.items():
for connection_idx, connection in enumerate(tel_data_server_connections):
tel_to_node_map.append(
ConnectionJobInfo(
tel_id=tel_id,
hostname=connection.hostname,
port=connection.port,
slurm_reservation=node_list[node_idx + connection_idx][0],
slurm_node=node_list[node_idx + connection_idx][1],
)
)
node_idx += len(tel_data_server_connections)
return tel_to_node_map
[docs]
def srun_cmd_worker_nodes(
connection_jobs_info: List[ConnectionJobInfo],
cmd: str,
additional_slurm_params: List[str] | None = None,
error_level=LOGGING_LEVELS_DICT["CRITICAL"],
):
"""Submit a slurm command with srun on all worker nodes in `connection_jobs_info`
Parameters
----------
connection_jobs_info : List[ConnectionJobInfo]
List of connection job information: `cmd` will be executed for each node entry in this list.
cmd : str
Command to run with srun.
additional_slurm_params : List[str] or None
List of additional slurm jobs parameters, separated between args and values, for instance ["--mem", "20G"]
Optionnal, default is None.
Returns
-------
List[subprocess.CompletedProcess]
List of completed processes.
"""
completed_processes = []
for job_info in connection_jobs_info:
srun_cmd = " ".join(
[
"srun",
*(additional_slurm_params if additional_slurm_params is not None else ""),
"--reservation={}".format(job_info.slurm_reservation),
"--nodelist={}".format(job_info.slurm_node),
cmd,
]
)
logging.info("Running {}".format(srun_cmd))
completed_processes.append(
subprocess_run_and_raise_exception_on_error(
shlex.split(srun_cmd),
success_log_string="Success on node {}".format(job_info.slurm_node),
failure_log_string="Failure on node {} with {}".format(job_info.slurm_node, srun_cmd),
error_level=error_level,
log_level=LOGGING_LEVELS_DICT["DEBUG"],
)
)
return completed_processes
[docs]
def scancel_jobs(job_ids: List[int], signal: signal.Signals, delay_s: float = None, ignore_error: bool = False):
"""Run `scancel on `job_ids`, sending `signal` after `delay_s` seconds.
Use the --quiet argument of scancel to not raise error if the jobs are already stopped.
Parameters
----------
job_ids : List[str]
List of job ids to scancel
signal : signal.Signals
Signal to send with scancel
delay_s : float, optional
Amount of time in second to wait before performing the scancel
ignore_error : bool, optional
If True, the scancel is run directly with subprocess.srun, and any error happening in the subprocess
is simply ignored. This is usefull when running scancel -s KILL after a scancel -s INT: if the SIGINT
stopped the job already, the SIGKILL would have exit code 1 even with --quiet, but we want to ignore
the error in this case.
"""
# note: --full or -f is required for r0_dl1 daemons to receive signal
scancel_cmd = " ".join(["scancel", "--full", "-s", str(signal), *[str(id) for id in job_ids]])
if delay_s is not None:
time.sleep(delay_s)
if ignore_error:
try:
sp.run(shlex.split(scancel_cmd), capture_output=True, text=True, check=True)
logging.debug("Stopping jobs with {}".format(scancel_cmd))
except CalledProcessError as error:
logging.info(
"Ignoring error of {} caused by jobs already been stopped. Error info:\nstdout: {}\nstderr: {}".format(
scancel_cmd, error.stdout, error.stderr
)
)
else:
subprocess_run_and_raise_exception_on_error(
shlex.split(scancel_cmd),
success_log_string="Stopping job with {}".format(scancel_cmd),
failure_log_string="FAILURE to stop job with {}".format(scancel_cmd),
error_level=LOGGING_LEVELS_DICT["ERROR"],
log_level=LOGGING_LEVELS_DICT["DEBUG"],
)
[docs]
def stop_rta(slurm_reservations: List[str], slurm_account: str, r0dl1_job_name: str) -> Thread:
"""Stop the r0dl1 daemons
The r0dl1 jobs are immediately send a SIGINT signal, which should tell them to
gracefully shut down. A SIGKILL is also scheduled to run 10 seconds later to
ensure the jobs are indeed stopped.
Parameters
----------
slurm_reservations : List[str]
List of slurm reservation to search for r0dl1 daemons.
slurm_account : str
Slurm account to use when searching for the r0dl1 daemons.
r0dl1_job_name : str
Name of the r0dl1 jobs in the CDB configuration
Returns
-------
stop_thread : Thread
Started thread that will SIGKILL the r0dl1 jobs after 10 secs.
"""
job_ids = parse_slurm_job_ID(slurm_reservations, slurm_account, r0dl1_job_name)
logging.info("Found r0dl1 job ids {} to stop.".format(job_ids))
if job_ids:
# immediately send the SIGINT
scancel_jobs(job_ids, signal.SIGINT, None)
# start a detached thread to send the SIGKILL in 10 seconds.
# this allows autorta to continue and start new run immediately, while jobs are shuting down.
stop_thread = Thread(target=scancel_jobs, args=(job_ids, signal.SIGKILL, 10.0, True))
stop_thread.start()
return stop_thread
return None
[docs]
def nuke_rta(slurm_account: str):
"""Hard stop of All RTA jobs: scancels all jobs of `slurm_account`
Parameters
----------
slurm_account : str
slurm account
"""
nuke_rta_cmd = " ".join(["scancel", "-u", slurm_account])
subprocess_run_and_raise_exception_on_error(
shlex.split(nuke_rta_cmd),
"Stopped RTA with {}".format(nuke_rta_cmd),
"Could not stop RTA with {}".format(nuke_rta_cmd),
error_level=LOGGING_LEVELS_DICT["CRITICAL"],
log_level=LOGGING_LEVELS_DICT["WARNING"],
)
[docs]
def write_reco_manager_observation_config(
obs_info: ObsInfo,
obs_dir: Path,
night_path_structure: RecoPathStructure,
auto_rta_config: AutoRTAConfiguration,
output_path: Path,
):
"""Writes the observation configuration for the reco-manager
Parameters
----------
obs_info : ObsInfo
Observation parameters from observation DB
obs_dir : Path
Path to the observation data directory
night_path_structure : RecoPathStructure
Path structure of the night
auto_rta_config : AutoRTAConfiguration
Configuration of Auto RTA
output_path: Path
Path where to write the hiperta_stream_start configuration
"""
hiperta_obs_config = ObservationParameters.model_validate(
{
"sb_id": 1, # no scheduling block in LST
"obs_id": obs_info.obs_id,
"tel_id": 1, # only 1 tel
"RA_pointing": obs_info.RA,
"DEC_pointing": obs_info.DEC,
"dl1_dir": str(night_path_structure.dl1_dir(obs_dir)),
"dl2_dir": str(night_path_structure.dl2_dir(obs_dir)),
"dl3_dir": str(night_path_structure.dl3_dir(obs_dir)),
"log_dir": str(night_path_structure.log_dir(obs_dir)),
"reco_manager_log_file": str(night_path_structure.log_dir(obs_dir) / "hiperta_stream_start.log"),
"data_stream_connections": auto_rta_config.tel_ids_to_data_servers[1], # only do tel ID 1 for LST 1
"slurm_nodelists": auto_rta_config.slurm_nodes,
}
)
with open(output_path, "w") as obs_config_f:
obs_config_f.write(hiperta_obs_config.model_dump_json(indent=4))
[docs]
def main():
"""
Entrypoint of Auto_RTA:
- parse the available slurm nodes
- check that the slurm nodes are in "connected" network mode
- copy conda environment to slurm nodes (usually in /dev/shm/ (RAM))
- query database for current RUN and for each run:
- stops previous RTA reconstruction slurm jobs
- starts new R0-DL1 jobs for the new RUN with static configuration (CDB config) and
a newly written dynamic config
- starts engineering GUI plotting scripts
- stops at a fixed hour set in configuration, after cleaning slurm nodes memory.
"""
start_time = datetime.datetime.now(datetime.UTC)
# initially write log where the script is called (home if cron job)
# so that errors can be logged if we can't parse the config
init_logging(log_level="DEBUG", log_filename="LST_AUTO_RTA.log")
# Load configuration
parser = argparse.ArgumentParser(
description="Automatic Starting of the RTA Reconstruction for an observation night",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"-c", "--config", dest="config", type=str, required=True, help="LST auto RTA configuration file."
)
args = parser.parse_args()
with open(args.config, "r") as config_file:
config = AutoRTAConfiguration.model_validate_json(config_file.read())
# Create night data directory
path_structure = RecoPathStructure(config.data_dir, start_time)
path_structure.create_night_data_dir()
# now update logging to write log in night's directory
init_logging(
log_level=config.log_level,
log_filename=path_structure.night_data_dir / "LST_AUTO_RTA.log",
)
# get the stop time during today
stop_time = start_time.replace(
hour=config.stop_time_UTC_hours, minute=config.stop_time_UTC_minutes, second=0, microsecond=0
)
# If we are passed it, it is actually tomorrow
if stop_time < start_time:
stop_time += datetime.timedelta(days=1)
logging.info("Start RTA at " + str(start_time))
logging.info("Found worker nodes : {}".format(config.slurm_nodes))
connection_jobs_info = assign_worker_to_data_connection(config.slurm_nodes, config.tel_ids_to_data_servers)
for job_info in connection_jobs_info:
logging.info(
'Will run tel {} connection {{"hostname": {}, "port": {}}} r0->dl1 job on {} (reservation {})'.format(
job_info.tel_id, job_info.hostname, job_info.port, job_info.slurm_node, job_info.slurm_reservation
)
)
logging.info("Reading R0-DL1 jobnames from CDB configuration")
# load CDB configuration, checking it exists, and parsing r0_dl1_job_name
with open(config.hiperta_CDB_config_file, "r") as CDB_f:
CDB_config = json.load(CDB_f)
r0_dl1_job_name = CDB_config["r0_dl1_params"]["r0_dl1_job_name"]
logging.info("Found job name: {}".format(r0_dl1_job_name))
if config.copy_env:
# For all srun commands here: error level is critical because RTA can not run if environment can not be copied
logging.info("Loading the environment on worker nodes for the night")
logging.info("Cleaning previous content in {}".format(config.env_archive_extraction_path))
srun_cmd_worker_nodes(
connection_jobs_info,
"rm -rf {}".format(config.env_archive_extraction_path),
error_level=LOGGING_LEVELS_DICT["CRITICAL"],
)
logging.info("Cleaning previous content in {}".format(config.models_archive_copy_path))
srun_cmd_worker_nodes(
connection_jobs_info,
"rm -rf {}".format(config.models_archive_copy_path),
error_level=LOGGING_LEVELS_DICT["CRITICAL"],
)
logging.info(
"Extracting environment from {} to {}".format(config.env_archive, config.env_archive_extraction_path)
)
srun_cmd_worker_nodes(
connection_jobs_info,
"bash -c 'mkdir {} && tar -xzf {} -C {}'".format(
config.env_archive_extraction_path, config.env_archive, config.env_archive_extraction_path
),
["--mem", "20G"],
error_level=LOGGING_LEVELS_DICT["CRITICAL"],
)
logging.info(
"Copying reconstruction models from {} to {}".format(
config.models_archive_path, config.models_archive_copy_path
)
)
srun_cmd_worker_nodes(
connection_jobs_info,
"cp -rf {} {}".format(config.models_archive_path, config.models_archive_copy_path),
["--mem", "20G"],
error_level=LOGGING_LEVELS_DICT["CRITICAL"],
)
if config.check_node_connection:
comp_proc_ib0_cat = srun_cmd_worker_nodes(connection_jobs_info, "cat /sys/class/net/ib0/mode")
# If a node is not connected, raise error
if not all("connected" in process.stdout.strip() for process in comp_proc_ib0_cat):
raise RuntimeError(
"Not all RTA slurm nodes are connected to ib0! Found {}".format(
", ".join(
[
"{}: {}".format(job_info.slurm_node, process.stdout.strip())
for job_info, process in zip(connection_jobs_info, comp_proc_ib0_cat)
]
)
)
)
logging.info("RTA ready for the night !")
current_obs_info = ObsInfo(None, None, None, None, None, None)
while not (datetime.datetime.now(datetime.UTC) > stop_time):
# query TCU DB for observation
try:
obs_info = get_current_run_info(config.db_hostname, 10)
except PyMongoError:
logging.error("Error retrieving observation information from DB, ignoring...", exc_info=True)
obs_info = ObsInfo(None, None, None, None, None, None)
# Check observation data
obs_info_is_none = obs_info.RA is None or obs_info.DEC is None
obs_recent_enough = True # default value if we couldn't get an obs_info
if obs_info_is_none:
logging.warning("Queried observation information had no RA DEC.")
elif config.ignore_old_observation:
obs_info_tstart_datetime = datetime.datetime.fromtimestamp(obs_info.time_start_camera, datetime.UTC)
obs_info_time_delta = datetime.datetime.now(datetime.UTC) - obs_info_tstart_datetime
obs_recent_enough = obs_info_time_delta < datetime.timedelta(hours=4)
if not obs_recent_enough:
logging.info(
"Queried observation {} has start time {}, timedelta wrt now: {}. Too old to start RTA".format(
obs_info.obs_id, obs_info_tstart_datetime, obs_info_time_delta
)
)
# Start RTA
if current_obs_info.obs_id != obs_info.obs_id and (not obs_info_is_none) and obs_recent_enough:
# We got a new observation!
logging.info(
"Got new observation! ID: {} - RA: {} - DEC: {} - SOURCE.RA: {} - SOURCE.DEC: {}".format(
obs_info.obs_id, obs_info.RA, obs_info.DEC, obs_info.source_RA, obs_info.source_DEC
)
)
try:
logging.info("Stopping RTA")
stop_rta(config.slurm_reservations, config.slurm_account, r0_dl1_job_name)
except Exception:
logging.error("Could not stop RTA. NEXT RUN DATA MAY BE ACQUIRED BY PREVIOUS RUN DAEMONS !")
# Note: we could nuke_rta, but it would also kill DQ, SCI jobs, etc...
obs_dir = path_structure.create_observation_data_dirs(str(obs_info.obs_id), True)
logging.info("Created directories for obs {} at {}".format(obs_info.obs_id, obs_dir))
reco_manager_obs_config_path = (
path_structure.log_dir(obs_dir) / "hiperta_stream_start_observation_config.json"
)
logging.info("Writing reco-manager observation configuration at {}".format(reco_manager_obs_config_path))
write_reco_manager_observation_config(
obs_info,
obs_dir,
path_structure,
config,
reco_manager_obs_config_path,
)
# Note: hiperta_stream_start has to be started on a worker node as well, because it
# will read the training r0dl1 configuration, which is in the model's archive copied to /dev/shm
# (it reads the r0dl1 configuration from the path set in the CDB config, that must point to /dev/shm)
# Note2: to run several commands with srun, need to wrap with bash -c '...'
hiperta_stream_start_cmd = " ".join(
[
"bash -c '" "export PATH={}/bin/:$PATH ; {}/bin/hiperta_stream_start".format(
config.env_archive_extraction_path, config.env_archive_extraction_path
),
"-c",
config.hiperta_CDB_config_file,
"-d",
str(reco_manager_obs_config_path),
"'",
]
)
try:
# use 1st node to start hiperta_stream
# Note: hiperta_stream must run on a worker node as well, because it needs access to
# the r0dl1 training configuration, which is found form the path in the CDB configuration,
# which points to /dev/shm/model_archive/...
hiperta_stream_job_info = connection_jobs_info[0]
hiperta_stream_srun_cmd = " ".join(
[
"srun",
"--reservation={}".format(hiperta_stream_job_info.slurm_reservation),
"--nodelist={}".format(hiperta_stream_job_info.slurm_node),
hiperta_stream_start_cmd,
]
)
logging.info("Starting hiperta_stream_start with {}".format(hiperta_stream_srun_cmd))
subprocess_run_and_raise_exception_on_error(
shlex.split(hiperta_stream_srun_cmd),
success_log_string="hiperta_stream started with {}".format(hiperta_stream_job_info.slurm_node),
failure_log_string="Failed to start hiperta_stream on {} with {}".format(
hiperta_stream_job_info.slurm_node, hiperta_stream_start_cmd
),
error_level=LOGGING_LEVELS_DICT[
"ERROR"
], # we might want to continue even if we can't start reco-manager
log_level=LOGGING_LEVELS_DICT["DEBUG"],
)
# Only if we could start RTA we update the current obs_info
# Otherwise, we will loop, see a new observation again, stop RTA and re-start, etc ...
current_obs_info = obs_info
except sp.SubprocessError:
logging.error("Failed to start reco-manager with {}".format(hiperta_stream_start_cmd))
pass # continue anyway, obs_info is not updated so we will try again to start
# If not starting RTA: Query squeue for some statistics on the running jobs
else:
try:
running_jobs_info = subprocess_run_and_raise_exception_on_error(
shlex.split('squeue -u {} --format="%T,%R"'.format(config.slurm_account)),
failure_log_string="Could not parse slurm info while waiting for next observation",
error_level=LOGGING_LEVELS_DICT["ERROR"],
log_level=LOGGING_LEVELS_DICT["DEBUG"],
).stdout
n_jobs, n_running, n_pending, n_request_node_not_available = job_statistics_from_squeue_output(
running_jobs_info
)
logging.info(
"{} job statistics: nb_jobs: {} - nb_running: {} - nb_pending: {} - nb_node_not_available: {}".format(
config.slurm_account, n_jobs, n_running, n_pending, n_request_node_not_available
)
)
except sp.SubprocessError:
pass # continue loop anyway
# sleep until we get an observation
logging.info("RTA waiting for next observation, current observation is {}".format(obs_info.obs_id))
time.sleep(1.0)
logging.info("End of the night: Stop the RTA")
try:
logging.info("Stopping RTA")
sigkill_thread = stop_rta(config.slurm_reservations, config.slurm_account, r0_dl1_job_name)
if sigkill_thread is not None: # it is None if there are no jobs to kill
sigkill_thread.join(timeout=180.0) # 3 minutes timeout but job should end right after delay of 10 sec
else:
logging.warning("Found no r0_dl1 jobs to stop !")
except Exception:
logging.error("Could not stop RTA normally, nuking all {} jobs".format(config.slurm_account))
nuke_rta(config.slurm_account)
# Clean up node environment
logging.info("Cleaning nodes copied files")
logging.info("Cleaning content in {}".format(config.env_archive_extraction_path))
srun_cmd_worker_nodes(
connection_jobs_info,
"rm -rf {}".format(config.env_archive_extraction_path),
error_level=LOGGING_LEVELS_DICT["CRITICAL"],
)
logging.info("Cleaning content in {}".format(config.models_archive_copy_path))
srun_cmd_worker_nodes(
connection_jobs_info,
"rm -rf {}".format(config.models_archive_copy_path),
error_level=LOGGING_LEVELS_DICT["CRITICAL"],
)
logging.info("Done cleaning nodes.")
logging.info("RTA Done for the night. Good day !")
if __name__ == "__main__":
main()