Coverage for lst_auto_rta/config/configuration.py: 100%
50 statements
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-17 14:47 +0000
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-17 14:47 +0000
1from functools import cached_property
2from typing import Dict, List
4from annotated_types import Gt
5from pydantic import BaseModel, Field, computed_field, model_validator
6from typing_extensions import Annotated
8from lst_auto_rta.utils.slurm import parse_slurm_nodes
11class DataStreamConnectionConfiguration(BaseModel):
12 """Parameters to connect to data streamers"""
14 hostname: str = Field(
15 title="hostname",
16 description="Hostname on the network of the data streamer",
17 examples=["localhost", "tcs05-ib0"],
18 )
19 port: int = Field(
20 gt=0, le=65535, title="Port", description="Port on the network of the data streamer", examples=[25000, 3391]
21 )
24class AutoRTAConfiguration(BaseModel):
25 """Parameters of the autoRTA script."""
27 copy_env: bool = Field(
28 title="RAM env copy",
29 description="If true: copies the conda environment and model files to worker nodes RAM at the start of the night",
30 examples=[True],
31 )
32 env_archive: str = Field(
33 title="Environment archive",
34 description="Environment archive to copy to `copy_destination_dir` if `copy_env` is true, not used otherwise.",
35 examples=["/fefs/onsite/pipeline/rta/sag_reco_auto_rta/RTA_Dev_EVBv6_CDB_shm_2024_05_06.tar.gz"],
36 )
37 env_archive_extraction_path: str = Field(
38 title="Environment archive extraction path",
39 description="The directory where the environment archive will be extracted if `copy_env` is true, not used otherwise. "
40 "The extraction is a simple tar -xzf archive.tar.gz -C env_archive_extraction_path command, so the directory must exist.",
41 examples=["/dev/shm/env_folder"],
42 )
43 hiperta_CDB_config_file: str = Field(
44 title="HiPeRTA configuration file",
45 description="Path to hiperta_stream_start CDB configuration (static configuration)",
46 examples=["/fefs/onsite/pipeline/rta/sag_reco_auto_rta/configuration/CDB_configuration_2024_01_31.json"],
47 )
48 ignore_old_observation: bool = Field(
49 title="Ignore old observation",
50 description="If the queried observation has a tstart that is more than 4 hours before current time, do not start r0-dl1. "
51 "This should typically be True in production, and False for day tests.",
52 examples=[True],
53 )
54 log_level: str = Field(
55 title="LST_AUTO_RTA logging level",
56 description="Logging level for the lst auto RTA script. Accept values from python's logging module",
57 examples=["DEBUG", "INFO", "WARNING"],
58 )
59 models_archive_copy_path: str = Field(
60 title="Reconstruction models copy destination",
61 description="The directory where the reco models will be copied if `copy_env` is True, not used otherwise. "
62 'Note: the copy is a simple "cp" command, so a trailing "/" will copy the environment in a subfolder of this path.',
63 examples=["/dev/shm/models_archive/2024_05_17"],
64 )
65 models_archive_path: str = Field(
66 title="Reconstruction models archive path",
67 description='Path to the reconstruction models "archive" (folder containing model with special structure), '
68 "it will be copied to `copy_destination_dir` if `copy_env` is true.",
69 examples=["/fefs/onsite/pipeline/rta/sag_reco_auto_rta/model_archives/2024_05_06"],
70 )
71 stop_time_UTC_hours: int = Field(
72 ge=0,
73 lt=24,
74 title="Stop time hour for auto RTA, in UTC time zone.",
75 description="Auto RTA will shut down when `stop_time_UTC_hours:stop_time_UTC_minutes` is reached",
76 examples=[7],
77 )
78 stop_time_UTC_minutes: int = Field(
79 ge=0,
80 lt=60,
81 title="Stop time minute for auto RTA, in UTC time zone",
82 description="Auto RTA will shut down when stop_time_UTC_hours:stop_time_UTC_minutes is reached",
83 examples=[30],
84 )
85 check_node_connection: bool = Field(
86 title="Nodes connection check",
87 description="If true: do NOT start the RTA if the worker nodes are not connected to infinyband network.",
88 examples=[True],
89 )
90 db_hostname: str = Field(
91 title="LST DB hostname", description="Hostname of the LST DB of observation service data", examples=["lst101"]
92 )
93 data_dir: str = Field(
94 title="Data directory",
95 description="Base directory for RTA output files."
96 " Observations files and logs will be written in an appropriate subfolder",
97 examples=["/fefs/onsite/pipeline/rta/data/"],
98 )
99 slurm_account: str = Field(
100 title="Slurm account", description="Slurm user of the auto_rta slurm commands", examples=["lstrta"]
101 )
102 slurm_reservations: List[str] = Field(
103 title="Slurm reservations",
104 description="List of slurm reservation to use to start RTA jobs (all nodes of all reservations will be used)",
105 examples=[["rta_one_node", "rta_3_nodes_nightly"]],
106 )
107 tel_ids_to_data_servers: Dict[Annotated[int, Gt(0)], List[DataStreamConnectionConfiguration]] = Field(
108 title="Streamers per tel ID",
109 description="Map from telescope ID to data servers connections",
110 examples=[{1: [{"hostname": "tcs05-ib0", "port": 25000}, {"hostname": "tcs05-ib0", "port": 25001}]}],
111 )
113 @computed_field
114 @cached_property
115 def slurm_nodes(self) -> Dict[str, List[str]]:
116 """List of slurm nodes per reservation.
118 Returns
119 -------
120 Dict[str, List[str]]
121 List of slurm nodes per reservation.
122 """
123 return parse_slurm_nodes(self.slurm_reservations, self.slurm_account)
125 @model_validator(mode="after")
126 def check_enough_slurm_nodes(self) -> "AutoRTAConfiguration":
127 """Check if there are enough slurm nodes (nb_nodes) to start all r0->dl1 jobs (nb_jobs). IE nb_nodes >= nb_jobs
129 Returns
130 -------
131 AutoRTAConfiguration
132 valid AutoRTAConfiguration
134 Raises
135 ------
136 ValueError
137 If the number of slurm nodes is less than the number of data server connections.
138 """
140 nb_slurm_nodes = sum([len(nodes) for nodes in self.slurm_nodes.values()])
141 nb_connections = sum([len(connections) for connections in self.tel_ids_to_data_servers.values()])
142 if nb_slurm_nodes < nb_connections:
143 raise ValueError(
144 "Number of slurm nodes {} less than number of r0->dl1 jobs {} !\n"
145 "Reminder: nodes in INACTIVE reservation are discarded !".format(nb_slurm_nodes, nb_connections)
146 )
147 return self
150class ObservationParameters(BaseModel):
151 """Observation run-time parameters to pass to hiperta_stream_start (dynamic configuration)"""
153 sb_id: int = Field(
154 ge=0,
155 title="Scheduling Block ID",
156 description="ID of the observation's scheduling block.",
157 examples=[12345],
158 )
159 obs_id: int = Field(
160 ge=0,
161 title="Observation ID",
162 description="Id of the observation",
163 examples=[12345],
164 )
165 tel_id: int = Field(
166 ge=0,
167 title="Telescope ID",
168 description="Telescope ID",
169 examples=[1],
170 )
171 RA_pointing: float = Field(
172 ge=0.0,
173 le=360.0,
174 title="Rate Ascension",
175 description="Pointing Rate Ascension during the observation.",
176 examples=[20.0],
177 )
178 DEC_pointing: float = Field(
179 ge=-90.0,
180 le=90.0,
181 title="Declination",
182 description="Pointing declination during the observation",
183 examples=[60.0],
184 )
185 dl1_dir: str = Field(
186 title="DL1 directory",
187 description="Path to the directory where to write the DL1 files.",
188 examples=["/fefs/onsite/pipeline/rta/data/YYYY/MM/DD/DL1"],
189 )
190 dl2_dir: str = Field(
191 title="DL2 directory",
192 description="Path to the directory where to write the DL2 files.",
193 examples=["/fefs/onsite/pipeline/rta/data/YYYY/MM/DD/DL2"],
194 )
195 dl3_dir: str = Field(
196 title="DL3 directory",
197 description="Path to the directory where to write the DL3 files.",
198 examples=["/fefs/onsite/pipeline/rta/data/YYYY/MM/DD/DL3"],
199 )
200 log_dir: str = Field(
201 title="Log directory",
202 description="Path to the directory where to write the log files.",
203 examples=["/fefs/onsite/pipeline/rta/data/YYYY/MM/DD/logs"],
204 )
205 reco_manager_log_file: str = Field(
206 title="hiperta_stream_start log file",
207 description="Log file for reco manager entrypoint (hiperta_stream_start). "
208 "This path must NOT contain any @{} string substitution"
209 "(opening the log file is the first thing reco-manager will do, before substituting strings)",
210 examples=[
211 "/fefs/onsite/pipeline/rta/data/YYYY/MM/DD/hiperta_stream.log",
212 ],
213 )
214 data_stream_connections: List[DataStreamConnectionConfiguration] = Field(
215 title="Stream Connections",
216 description="Parameters of the connections to data streamers",
217 examples=[[{"hostname": "tcs06", "port": 25000}]],
218 )
219 slurm_nodelists: Dict[str, List[str]] = Field(
220 title="Slurm Node List",
221 description="List of the slurm nodes to use, per slurm reservation",
222 examples=[{"rta_3_nodes_nightly": ["cp15", "cp16"], "rta-one-node": ["cp19"]}],
223 )