Coverage for lst_auto_rta/hdf5_data_check.py: 0%

161 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-22 14:47 +0000

1import datetime 

2import time 

3from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser 

4from pathlib import Path 

5from threading import Lock 

6from typing import Annotated, Dict, List, NamedTuple 

7 

8import matplotlib.dates 

9import numpy as np 

10import pandas as pd 

11import seaborn as sns 

12from annotated_types import Ge, Gt 

13from matplotlib import pyplot as plt 

14from matplotlib.backends.backend_pdf import PdfPages 

15from pandas.api.types import is_datetime64_any_dtype 

16from scipy import ndimage 

17from tqdm import tqdm 

18 

19from lst_auto_rta.config.hdf5_data_check import HDFDataCheckConfiguration 

20from lst_auto_rta.utils.hdf5 import pd_read_hdf_with_retry 

21 

22 

23class FileProvenance(NamedTuple): 

24 obs_id: Annotated[int, Ge(0)] 

25 line_idx: Annotated[int, Ge(0)] 

26 worker_node: str 

27 

28 

29# def parse_file_provenance(cta_data_file_path: str, process_idx_to_nodelist: Dict[int, str]) -> FileProvenance: 

30# # get the string after "..._id_", and split on "_" to get only this part of the file path. 

31# obs_id = int(cta_data_file_path.split("obs_id_", 1)[1].split("_", 1)[0]) 

32# line_idx = int(cta_data_file_path.split("line_idx_", 1)[1].split("_", 1)[0]) 

33# return FileProvenance(obs_id=obs_id, line_idx=line_idx, worker_node=process_idx_to_nodelist[line_idx]) 

34 

35 

36def parse_file_provenance(cta_data_file_path: Path, process_idx_to_nodelist: Dict[int, str]) -> FileProvenance: 

37 # dl1_v06_${obs_id}_${run_id}_${tel_id}_${processIndex}_${threadIndex}_${fileIndex}.h5 

38 split_ = cta_data_file_path.stem.split("_") 

39 obs_id = int(split_[3]) 

40 line_idx = int(split_[5]) 

41 return FileProvenance(obs_id=obs_id, line_idx=line_idx, worker_node=process_idx_to_nodelist[line_idx]) 

42 

43 

44def add_debuging_fields_dl1_df( 

45 dl1_df: pd.DataFrame, 

46 obs_id: int, 

47 line_idx: int, 

48 worker_node: str, 

49 worker_node_categories: List[str], 

50 start=None, 

51 end=None, 

52 step=None, 

53) -> None: 

54 # Some events have a trigger time "from the future" that mess with event_id_diff 

55 # so set the event id diff of all of these future events and the following event as well to 0 

56 # prepend 1st value to get diff = 0 for 1st elem 

57 event_id_diffs = np.diff(dl1_df["event_id"], prepend=dl1_df["event_id"][0]) 

58 event_id_diffs[ 

59 ndimage.binary_dilation(dl1_df["trigger_time"] > pd.to_datetime("21000101"), np.array([0, 1, 1])) 

60 ] = 0 

61 dl1_df.loc[slice(start, end, step), "event_id_diff"] = event_id_diffs 

62 dl1_df.loc[slice(start, end, step), "obs_id"] = obs_id 

63 dl1_df.loc[slice(start, end, step), "line_idx"] = line_idx 

64 dl1_df.loc[slice(start, end, step), "worker_node"] = pd.Categorical( 

65 [worker_node] * dl1_df.shape[0], categories=worker_node_categories 

66 ) 

67 

68 

69def timestamp_to_datetime(df: pd.DataFrame, columns: List[str]): 

70 for col in columns: 

71 df[col] = pd.to_datetime(df[col], unit="s") 

72 

73 

74class HDFDataCheck: 

75 def __init__( 

76 self, 

77 data_check_config: HDFDataCheckConfiguration, 

78 process_idx_to_nodelist: Dict[int, str], 

79 ): 

80 self.conf = data_check_config 

81 

82 self.last_plot_datetime = datetime.datetime.now() 

83 self.plot_refresh_interval_timedelta = datetime.timedelta(seconds=data_check_config.plot_refresh_interval_s) 

84 

85 self.process_idx_to_nodelist = process_idx_to_nodelist 

86 

87 self.data_lock = Lock() 

88 self.hdf_data = None 

89 self.current_length = 0 

90 

91 def init_db(self, dtypes: pd.Series): 

92 # Allocate the big df that will accumulate all the data to plot. 

93 # size should be more than enough to store an entire run data 

94 # we first create an empty df with the right length 

95 # then we create the columns with a default value, according to the dtype 

96 # (pandas automatically fills missing values with NaNs, which changes the type of the column to float 

97 # if we were to try to create the df with only the dtype and an example data.) 

98 self.hdf_data = pd.DataFrame(index=np.arange(self.conf.size_storage_dataframe)) 

99 for col in dtypes.index: 

100 col_dtype = dtypes[col] 

101 if isinstance(col_dtype, pd.CategoricalDtype): 

102 self.hdf_data[col] = pd.Categorical( 

103 [col_dtype.categories[0]] * self.hdf_data.shape[0], categories=col_dtype.categories 

104 ) 

105 elif np.issubdtype(col_dtype, np.datetime64): 

106 self.hdf_data[col] = np.datetime64(0, "ns") 

107 elif np.issubdtype(col_dtype, bool): 

108 self.hdf_data[col] = False 

109 elif np.issubdtype(col_dtype, np.integer): 

110 self.hdf_data[col] = col_dtype.type(np.iinfo(col_dtype).max) 

111 elif np.issubdtype(col_dtype, np.inexact): 

112 self.hdf_data[col] = col_dtype.type(np.nan) 

113 else: 

114 raise ValueError( 

115 "Can not set default value of column {} with dtype {}! Unsupported dtype!".format(col, col_dtype) 

116 ) 

117 

118 def read_hdf5_data(self, hdf5_file_path: Path): 

119 file_provenance = parse_file_provenance(hdf5_file_path, self.process_idx_to_nodelist) 

120 # Read the hdf5 data directly in pandas DataFrame 

121 # Alternatively, we could read with pytables and then copy the columns in the big df with table.col("col_name") 

122 # but moving to pandas straight away allows to perform all operations on df 

123 file_df = pd_read_hdf_with_retry( 

124 hdf5_file_path, 

125 key=self.conf.data_path_in_hdf5, 

126 mode="r", 

127 nb_tries=self.conf.hdf5_open_nb_retries, 

128 retry_wait_time_s=self.conf.hdf5_open_wait_time_s, 

129 retry_on_os_error=True, 

130 ) 

131 if self.conf.column_to_datetime is not None: 

132 timestamp_to_datetime(file_df, self.conf.column_to_datetime) 

133 add_debuging_fields_dl1_df( 

134 file_df, 

135 file_provenance.obs_id, 

136 file_provenance.line_idx, 

137 file_provenance.worker_node, 

138 list(self.process_idx_to_nodelist.values()), 

139 ) 

140 

141 file_nb_events = file_df.shape[0] 

142 

143 with self.data_lock: 

144 # If the big df is not initialized: do it now that we know the dtype 

145 if self.hdf_data is None: 

146 self.init_db(file_df.dtypes) 

147 

148 # copy the file df in the big df and update length. Do not use loc but iloc !! 

149 self.hdf_data.iloc[self.current_length : self.current_length + file_nb_events] = file_df 

150 

151 self.current_length += file_nb_events 

152 

153 def data_check_new_file(self, hdf5_file_path: Path, png_dir_path: Path, plots_pdf_path: Path): 

154 self.read_hdf5_data(hdf5_file_path) 

155 

156 if datetime.datetime.now() - self.last_plot_datetime > self.plot_refresh_interval_timedelta: 

157 self.last_plot_datetime = datetime.datetime.now() 

158 with self.data_lock: 

159 self.plot(png_dir_path, plots_pdf_path) 

160 

161 def plot(self, png_dir_path: Path, plots_pdf_path: Path): 

162 sns.set_theme() 

163 

164 png_dir_path.mkdir(exist_ok=True) 

165 

166 # legend order 

167 legend_order = sorted(self.hdf_data.loc[:, "worker_node"].dtype.categories) 

168 # masks 

169 mask_event_quality = self.hdf_data["event_quality"] == 0 

170 mask_is_good_event = self.hdf_data["is_good_event"] == 1 

171 mask_future_events = self.hdf_data["trigger_time"] <= pd.to_datetime("21000101") 

172 

173 # TODO: 

174 # - repair histogram plots with numpy_x fcts or numpy_y 

175 # - apply masks on plots that have e31 values ... 

176 # - find out why seaborn is so slow (do only histograms ?) 

177 # - implement tests 

178 

179 with PdfPages(plots_pdf_path) as pdf_pages: 

180 for plt_png_path, plt_conf in self.conf.plots_config.items(): 

181 fig, ax = plt.subplots(1, 1, figsize=(12, 8)) 

182 

183 # select data based on masks and current length 

184 fig_mask = np.ones((self.hdf_data.shape[0],), dtype=bool) 

185 fig_mask[self.current_length :] = 0 # do not select anything over current length 

186 if plt_conf.mask_event_quality: 

187 fig_mask &= mask_event_quality 

188 if plt_conf.mask_future_events: 

189 fig_mask &= mask_future_events 

190 if plt_conf.mask_is_good_event: 

191 fig_mask &= mask_is_good_event 

192 data = self.hdf_data.loc[fig_mask, :] 

193 # Apply groupby operation if requested 

194 if plt_conf.groupby is not None: 

195 data = getattr( 

196 data.groupby( 

197 [ 

198 item if isinstance(item, str) else pd.Grouper(**item) 

199 for item in plt_conf.groupby.groupby 

200 ] 

201 ), 

202 plt_conf.groupby.computation[0], 

203 **(plt_conf.groupby.kwargs if plt_conf.groupby.kwargs is not None else {}), 

204 )(**(plt_conf.groupby.computation[1] if plt_conf.groupby.computation[1] is not None else {})) 

205 

206 # Apply function on x or y data if requested 

207 for field, fct_conf in zip(["x", "y"], [plt_conf.np_function_x, plt_conf.np_function_y]): 

208 if fct_conf is not None: 

209 try: 

210 plt_conf.kwargs[field] = getattr(np, fct_conf[0])( 

211 data.loc[:, plt_conf.kwargs[field]], 

212 **(fct_conf[1] if fct_conf[1] is not None else {}), 

213 ) 

214 except AttributeError as e: 

215 raise ValueError( 

216 "Could not apply function {} with kwargs {} on {}".format( 

217 fct_conf[0], fct_conf[1], plt_conf.kwargs[field] 

218 ) 

219 ) from e 

220 

221 if plt_conf.kind == "plot": 

222 plot_fct = sns.lineplot 

223 # do not sort unless user requested it (lineplot sorted is quite confusing) 

224 plt_conf.kwargs["sort"] = plt_conf.kwargs.get("sort", False) 

225 plt_conf.kwargs["style"] = plt_conf.kwargs.get("style", "worker_node") 

226 plt_conf.kwargs["style_order"] = plt_conf.kwargs.get("style_order", legend_order) 

227 elif plt_conf.kind == "hist": 

228 plot_fct = sns.histplot 

229 plt_conf.kwargs["multiple"] = plt_conf.kwargs.get("multiple", "dodge") 

230 # plt_conf.kwargs["align"] = plt_conf.kwargs.get("align", "center") # not supported by seaborn which already supplies the 'align' arg 

231 plt_conf.kwargs["shrink"] = plt_conf.kwargs.get("shrink", 0.8) 

232 else: 

233 raise ValueError("Plot kind {} not supported".format(plt_conf.kind)) 

234 # todo: fix histo intensity 

235 # fix xticks time: show second 

236 # fix histogram column placement 

237 plot_fct( 

238 **plt_conf.kwargs, 

239 data=data, 

240 hue="worker_node", 

241 hue_order=legend_order, 

242 ax=ax, 

243 ) 

244 if plt_conf.title is not None: 

245 ax.set_title(plt_conf.title) 

246 if plt_conf.xlabel is not None: 

247 ax.set_xlabel(plt_conf.xlabel) 

248 if plt_conf.ylabel is not None: 

249 ax.set_ylabel(plt_conf.ylabel) 

250 

251 if len(ax.get_xticklabels()[0].get_text()) >= 6: 

252 ax.set_xticks(ax.get_xticks(), ax.get_xticklabels(), rotation=45, ha="right") 

253 

254 fig.tight_layout() 

255 fig.savefig(png_dir_path / plt_png_path, bbox_inches="tight") 

256 pdf_pages.savefig(fig) 

257 plt.close(fig) 

258 

259 def write_df_and_reset_data(self, data_dir: Path, df_write_path: Path): 

260 self.hdf_data.loc[: self.current_length].to_hdf(df_write_path, key=self.conf.write_df_store_key, mode="w") 

261 self.current_length = 0 

262 

263 

264def main(): 

265 parser = ArgumentParser( 

266 description="Data check plotting util of LST AUTO RTA: " 

267 "run this entrypoint to reproduce offline the plots generated during the night", 

268 formatter_class=ArgumentDefaultsHelpFormatter, 

269 ) 

270 parser.add_argument( 

271 "-c", 

272 "--data_check_config", 

273 type=str, 

274 required=True, 

275 dest="config", 

276 help="Path to the data check configuration file", 

277 ) 

278 parser.add_argument( 

279 "-d", 

280 "--data_dir", 

281 type=str, 

282 required=True, 

283 dest="data_dir", 

284 help="Path to the directory containing the data files, to run the data check on", 

285 ) 

286 parser.add_argument( 

287 "-n", 

288 "--night_nodes", 

289 type=str, 

290 nargs="+", 

291 dest="nodes", 

292 help="List of worker nodes used during the night. Eg: cp12 cp15 cp42 cp58", 

293 ) 

294 parser.add_argument( 

295 "-p", 

296 "--png_dir_path", 

297 type=str, 

298 required=True, 

299 dest="png_dir_path", 

300 help="Path to the directory where to write the plots as png images.", 

301 ) 

302 parser.add_argument( 

303 "-f", "--pdf_path", type=str, required=True, dest="pdf_path", help="Path to the output pdf with the plots." 

304 ) 

305 args = parser.parse_args() 

306 

307 start_time = time.process_time() 

308 with open(Path(args.config), "r") as config_f: 

309 config = HDFDataCheckConfiguration.model_validate_json(config_f.read()) 

310 config_loaded_time = time.process_time() 

311 print("Configuration loading time: {:.2f}s".format(config_loaded_time - start_time)) 

312 

313 data_checker = HDFDataCheck(config, {i: n for i, n in enumerate(args.nodes)}) 

314 instantiation_time = time.process_time() 

315 print("Instantiation time: {:.2f}s".format(instantiation_time - config_loaded_time)) 

316 

317 file_list = [ 

318 p for p in Path(args.data_dir).glob("*") if p.is_file() and p.suffix == ".h5" and p.stem.startswith("dl1_v06") 

319 ] 

320 data_checker.read_hdf5_data(file_list[0]) 

321 init_and_load_first_file_time = time.process_time() 

322 print("Big df init and load 1st file time: {:.2f}s".format(init_and_load_first_file_time - instantiation_time)) 

323 

324 for data_file in tqdm(file_list[1:]): 

325 data_checker.read_hdf5_data(data_file) 

326 load_all_files_time = time.process_time() 

327 print("Load remaining files: {:.2f}s".format(load_all_files_time - init_and_load_first_file_time)) 

328 

329 data_checker.plot(Path(args.png_dir_path), Path(args.pdf_path)) 

330 plot_time = time.process_time() 

331 print("Plot time: {:.2f}s".format(plot_time - load_all_files_time)) 

332 

333 

334if __name__ == "__main__": 

335 main()