Coverage for lst_auto_rta/config/configuration.py: 100%

50 statements  

« prev     ^ index     » next       coverage.py v7.6.7, created at 2024-11-17 14:47 +0000

1from functools import cached_property 

2from typing import Dict, List 

3 

4from annotated_types import Gt 

5from pydantic import BaseModel, Field, computed_field, model_validator 

6from typing_extensions import Annotated 

7 

8from lst_auto_rta.utils.slurm import parse_slurm_nodes 

9 

10 

11class DataStreamConnectionConfiguration(BaseModel): 

12 """Parameters to connect to data streamers""" 

13 

14 hostname: str = Field( 

15 title="hostname", 

16 description="Hostname on the network of the data streamer", 

17 examples=["localhost", "tcs05-ib0"], 

18 ) 

19 port: int = Field( 

20 gt=0, le=65535, title="Port", description="Port on the network of the data streamer", examples=[25000, 3391] 

21 ) 

22 

23 

24class AutoRTAConfiguration(BaseModel): 

25 """Parameters of the autoRTA script.""" 

26 

27 copy_env: bool = Field( 

28 title="RAM env copy", 

29 description="If true: copies the conda environment and model files to worker nodes RAM at the start of the night", 

30 examples=[True], 

31 ) 

32 env_archive: str = Field( 

33 title="Environment archive", 

34 description="Environment archive to copy to `copy_destination_dir` if `copy_env` is true, not used otherwise.", 

35 examples=["/fefs/onsite/pipeline/rta/sag_reco_auto_rta/RTA_Dev_EVBv6_CDB_shm_2024_05_06.tar.gz"], 

36 ) 

37 env_archive_extraction_path: str = Field( 

38 title="Environment archive extraction path", 

39 description="The directory where the environment archive will be extracted if `copy_env` is true, not used otherwise. " 

40 "The extraction is a simple tar -xzf archive.tar.gz -C env_archive_extraction_path command, so the directory must exist.", 

41 examples=["/dev/shm/env_folder"], 

42 ) 

43 hiperta_CDB_config_file: str = Field( 

44 title="HiPeRTA configuration file", 

45 description="Path to hiperta_stream_start CDB configuration (static configuration)", 

46 examples=["/fefs/onsite/pipeline/rta/sag_reco_auto_rta/configuration/CDB_configuration_2024_01_31.json"], 

47 ) 

48 ignore_old_observation: bool = Field( 

49 title="Ignore old observation", 

50 description="If the queried observation has a tstart that is more than 4 hours before current time, do not start r0-dl1. " 

51 "This should typically be True in production, and False for day tests.", 

52 examples=[True], 

53 ) 

54 log_level: str = Field( 

55 title="LST_AUTO_RTA logging level", 

56 description="Logging level for the lst auto RTA script. Accept values from python's logging module", 

57 examples=["DEBUG", "INFO", "WARNING"], 

58 ) 

59 models_archive_copy_path: str = Field( 

60 title="Reconstruction models copy destination", 

61 description="The directory where the reco models will be copied if `copy_env` is True, not used otherwise. " 

62 'Note: the copy is a simple "cp" command, so a trailing "/" will copy the environment in a subfolder of this path.', 

63 examples=["/dev/shm/models_archive/2024_05_17"], 

64 ) 

65 models_archive_path: str = Field( 

66 title="Reconstruction models archive path", 

67 description='Path to the reconstruction models "archive" (folder containing model with special structure), ' 

68 "it will be copied to `copy_destination_dir` if `copy_env` is true.", 

69 examples=["/fefs/onsite/pipeline/rta/sag_reco_auto_rta/model_archives/2024_05_06"], 

70 ) 

71 stop_time_UTC_hours: int = Field( 

72 ge=0, 

73 lt=24, 

74 title="Stop time hour for auto RTA, in UTC time zone.", 

75 description="Auto RTA will shut down when `stop_time_UTC_hours:stop_time_UTC_minutes` is reached", 

76 examples=[7], 

77 ) 

78 stop_time_UTC_minutes: int = Field( 

79 ge=0, 

80 lt=60, 

81 title="Stop time minute for auto RTA, in UTC time zone", 

82 description="Auto RTA will shut down when stop_time_UTC_hours:stop_time_UTC_minutes is reached", 

83 examples=[30], 

84 ) 

85 check_node_connection: bool = Field( 

86 title="Nodes connection check", 

87 description="If true: do NOT start the RTA if the worker nodes are not connected to infinyband network.", 

88 examples=[True], 

89 ) 

90 db_hostname: str = Field( 

91 title="LST DB hostname", description="Hostname of the LST DB of observation service data", examples=["lst101"] 

92 ) 

93 data_dir: str = Field( 

94 title="Data directory", 

95 description="Base directory for RTA output files." 

96 " Observations files and logs will be written in an appropriate subfolder", 

97 examples=["/fefs/onsite/pipeline/rta/data/"], 

98 ) 

99 slurm_account: str = Field( 

100 title="Slurm account", description="Slurm user of the auto_rta slurm commands", examples=["lstrta"] 

101 ) 

102 slurm_reservations: List[str] = Field( 

103 title="Slurm reservations", 

104 description="List of slurm reservation to use to start RTA jobs (all nodes of all reservations will be used)", 

105 examples=[["rta_one_node", "rta_3_nodes_nightly"]], 

106 ) 

107 tel_ids_to_data_servers: Dict[Annotated[int, Gt(0)], List[DataStreamConnectionConfiguration]] = Field( 

108 title="Streamers per tel ID", 

109 description="Map from telescope ID to data servers connections", 

110 examples=[{1: [{"hostname": "tcs05-ib0", "port": 25000}, {"hostname": "tcs05-ib0", "port": 25001}]}], 

111 ) 

112 

113 @computed_field 

114 @cached_property 

115 def slurm_nodes(self) -> Dict[str, List[str]]: 

116 """List of slurm nodes per reservation. 

117 

118 Returns 

119 ------- 

120 Dict[str, List[str]] 

121 List of slurm nodes per reservation. 

122 """ 

123 return parse_slurm_nodes(self.slurm_reservations, self.slurm_account) 

124 

125 @model_validator(mode="after") 

126 def check_enough_slurm_nodes(self) -> "AutoRTAConfiguration": 

127 """Check if there are enough slurm nodes (nb_nodes) to start all r0->dl1 jobs (nb_jobs). IE nb_nodes >= nb_jobs 

128 

129 Returns 

130 ------- 

131 AutoRTAConfiguration 

132 valid AutoRTAConfiguration 

133 

134 Raises 

135 ------ 

136 ValueError 

137 If the number of slurm nodes is less than the number of data server connections. 

138 """ 

139 

140 nb_slurm_nodes = sum([len(nodes) for nodes in self.slurm_nodes.values()]) 

141 nb_connections = sum([len(connections) for connections in self.tel_ids_to_data_servers.values()]) 

142 if nb_slurm_nodes < nb_connections: 

143 raise ValueError( 

144 "Number of slurm nodes {} less than number of r0->dl1 jobs {} !\n" 

145 "Reminder: nodes in INACTIVE reservation are discarded !".format(nb_slurm_nodes, nb_connections) 

146 ) 

147 return self 

148 

149 

150class ObservationParameters(BaseModel): 

151 """Observation run-time parameters to pass to hiperta_stream_start (dynamic configuration)""" 

152 

153 sb_id: int = Field( 

154 ge=0, 

155 title="Scheduling Block ID", 

156 description="ID of the observation's scheduling block.", 

157 examples=[12345], 

158 ) 

159 obs_id: int = Field( 

160 ge=0, 

161 title="Observation ID", 

162 description="Id of the observation", 

163 examples=[12345], 

164 ) 

165 tel_id: int = Field( 

166 ge=0, 

167 title="Telescope ID", 

168 description="Telescope ID", 

169 examples=[1], 

170 ) 

171 RA_pointing: float = Field( 

172 ge=0.0, 

173 le=360.0, 

174 title="Rate Ascension", 

175 description="Pointing Rate Ascension during the observation.", 

176 examples=[20.0], 

177 ) 

178 DEC_pointing: float = Field( 

179 ge=-90.0, 

180 le=90.0, 

181 title="Declination", 

182 description="Pointing declination during the observation", 

183 examples=[60.0], 

184 ) 

185 dl1_dir: str = Field( 

186 title="DL1 directory", 

187 description="Path to the directory where to write the DL1 files.", 

188 examples=["/fefs/onsite/pipeline/rta/data/YYYY/MM/DD/DL1"], 

189 ) 

190 dl2_dir: str = Field( 

191 title="DL2 directory", 

192 description="Path to the directory where to write the DL2 files.", 

193 examples=["/fefs/onsite/pipeline/rta/data/YYYY/MM/DD/DL2"], 

194 ) 

195 dl3_dir: str = Field( 

196 title="DL3 directory", 

197 description="Path to the directory where to write the DL3 files.", 

198 examples=["/fefs/onsite/pipeline/rta/data/YYYY/MM/DD/DL3"], 

199 ) 

200 log_dir: str = Field( 

201 title="Log directory", 

202 description="Path to the directory where to write the log files.", 

203 examples=["/fefs/onsite/pipeline/rta/data/YYYY/MM/DD/logs"], 

204 ) 

205 reco_manager_log_file: str = Field( 

206 title="hiperta_stream_start log file", 

207 description="Log file for reco manager entrypoint (hiperta_stream_start). " 

208 "This path must NOT contain any @{} string substitution" 

209 "(opening the log file is the first thing reco-manager will do, before substituting strings)", 

210 examples=[ 

211 "/fefs/onsite/pipeline/rta/data/YYYY/MM/DD/hiperta_stream.log", 

212 ], 

213 ) 

214 data_stream_connections: List[DataStreamConnectionConfiguration] = Field( 

215 title="Stream Connections", 

216 description="Parameters of the connections to data streamers", 

217 examples=[[{"hostname": "tcs06", "port": 25000}]], 

218 ) 

219 slurm_nodelists: Dict[str, List[str]] = Field( 

220 title="Slurm Node List", 

221 description="List of the slurm nodes to use, per slurm reservation", 

222 examples=[{"rta_3_nodes_nightly": ["cp15", "cp16"], "rta-one-node": ["cp19"]}], 

223 )