Coverage for lst_auto_rta/hdf5_data_check.py: 0%
161 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-03 14:47 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-03 14:47 +0000
1import datetime
2import time
3from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
4from pathlib import Path
5from threading import Lock
6from typing import Annotated, Dict, List, NamedTuple
8import matplotlib.dates
9import numpy as np
10import pandas as pd
11import seaborn as sns
12from annotated_types import Ge, Gt
13from matplotlib import pyplot as plt
14from matplotlib.backends.backend_pdf import PdfPages
15from pandas.api.types import is_datetime64_any_dtype
16from scipy import ndimage
17from tqdm import tqdm
19from lst_auto_rta.config.hdf5_data_check import HDFDataCheckConfiguration
20from lst_auto_rta.utils.hdf5 import pd_read_hdf_with_retry
23class FileProvenance(NamedTuple):
24 obs_id: Annotated[int, Ge(0)]
25 line_idx: Annotated[int, Ge(0)]
26 worker_node: str
29# def parse_file_provenance(cta_data_file_path: str, process_idx_to_nodelist: Dict[int, str]) -> FileProvenance:
30# # get the string after "..._id_", and split on "_" to get only this part of the file path.
31# obs_id = int(cta_data_file_path.split("obs_id_", 1)[1].split("_", 1)[0])
32# line_idx = int(cta_data_file_path.split("line_idx_", 1)[1].split("_", 1)[0])
33# return FileProvenance(obs_id=obs_id, line_idx=line_idx, worker_node=process_idx_to_nodelist[line_idx])
36def parse_file_provenance(cta_data_file_path: Path, process_idx_to_nodelist: Dict[int, str]) -> FileProvenance:
37 # dl1_v06_${obs_id}_${run_id}_${tel_id}_${processIndex}_${threadIndex}_${fileIndex}.h5
38 split_ = cta_data_file_path.stem.split("_")
39 obs_id = int(split_[3])
40 line_idx = int(split_[5])
41 return FileProvenance(obs_id=obs_id, line_idx=line_idx, worker_node=process_idx_to_nodelist[line_idx])
44def add_debuging_fields_dl1_df(
45 dl1_df: pd.DataFrame,
46 obs_id: int,
47 line_idx: int,
48 worker_node: str,
49 worker_node_categories: List[str],
50 start=None,
51 end=None,
52 step=None,
53) -> None:
54 # Some events have a trigger time "from the future" that mess with event_id_diff
55 # so set the event id diff of all of these future events and the following event as well to 0
56 # prepend 1st value to get diff = 0 for 1st elem
57 event_id_diffs = np.diff(dl1_df["event_id"], prepend=dl1_df["event_id"][0])
58 event_id_diffs[
59 ndimage.binary_dilation(dl1_df["trigger_time"] > pd.to_datetime("21000101"), np.array([0, 1, 1]))
60 ] = 0
61 dl1_df.loc[slice(start, end, step), "event_id_diff"] = event_id_diffs
62 dl1_df.loc[slice(start, end, step), "obs_id"] = obs_id
63 dl1_df.loc[slice(start, end, step), "line_idx"] = line_idx
64 dl1_df.loc[slice(start, end, step), "worker_node"] = pd.Categorical(
65 [worker_node] * dl1_df.shape[0], categories=worker_node_categories
66 )
69def timestamp_to_datetime(df: pd.DataFrame, columns: List[str]):
70 for col in columns:
71 df[col] = pd.to_datetime(df[col], unit="s")
74class HDFDataCheck:
75 def __init__(
76 self,
77 data_check_config: HDFDataCheckConfiguration,
78 process_idx_to_nodelist: Dict[int, str],
79 ):
80 self.conf = data_check_config
82 self.last_plot_datetime = datetime.datetime.now()
83 self.plot_refresh_interval_timedelta = datetime.timedelta(seconds=data_check_config.plot_refresh_interval_s)
85 self.process_idx_to_nodelist = process_idx_to_nodelist
87 self.data_lock = Lock()
88 self.hdf_data = None
89 self.current_length = 0
91 def init_db(self, dtypes: pd.Series):
92 # Allocate the big df that will accumulate all the data to plot.
93 # size should be more than enough to store an entire run data
94 # we first create an empty df with the right length
95 # then we create the columns with a default value, according to the dtype
96 # (pandas automatically fills missing values with NaNs, which changes the type of the column to float
97 # if we were to try to create the df with only the dtype and an example data.)
98 self.hdf_data = pd.DataFrame(index=np.arange(self.conf.size_storage_dataframe))
99 for col in dtypes.index:
100 col_dtype = dtypes[col]
101 if isinstance(col_dtype, pd.CategoricalDtype):
102 self.hdf_data[col] = pd.Categorical(
103 [col_dtype.categories[0]] * self.hdf_data.shape[0], categories=col_dtype.categories
104 )
105 elif np.issubdtype(col_dtype, np.datetime64):
106 self.hdf_data[col] = np.datetime64(0, "ns")
107 elif np.issubdtype(col_dtype, bool):
108 self.hdf_data[col] = False
109 elif np.issubdtype(col_dtype, np.integer):
110 self.hdf_data[col] = col_dtype.type(np.iinfo(col_dtype).max)
111 elif np.issubdtype(col_dtype, np.inexact):
112 self.hdf_data[col] = col_dtype.type(np.nan)
113 else:
114 raise ValueError(
115 "Can not set default value of column {} with dtype {}! Unsupported dtype!".format(col, col_dtype)
116 )
118 def read_hdf5_data(self, hdf5_file_path: Path):
119 file_provenance = parse_file_provenance(hdf5_file_path, self.process_idx_to_nodelist)
120 # Read the hdf5 data directly in pandas DataFrame
121 # Alternatively, we could read with pytables and then copy the columns in the big df with table.col("col_name")
122 # but moving to pandas straight away allows to perform all operations on df
123 file_df = pd_read_hdf_with_retry(
124 hdf5_file_path,
125 key=self.conf.data_path_in_hdf5,
126 mode="r",
127 nb_tries=self.conf.hdf5_open_nb_retries,
128 retry_wait_time_s=self.conf.hdf5_open_wait_time_s,
129 retry_on_os_error=True,
130 )
131 if self.conf.column_to_datetime is not None:
132 timestamp_to_datetime(file_df, self.conf.column_to_datetime)
133 add_debuging_fields_dl1_df(
134 file_df,
135 file_provenance.obs_id,
136 file_provenance.line_idx,
137 file_provenance.worker_node,
138 list(self.process_idx_to_nodelist.values()),
139 )
141 file_nb_events = file_df.shape[0]
143 with self.data_lock:
144 # If the big df is not initialized: do it now that we know the dtype
145 if self.hdf_data is None:
146 self.init_db(file_df.dtypes)
148 # copy the file df in the big df and update length. Do not use loc but iloc !!
149 self.hdf_data.iloc[self.current_length : self.current_length + file_nb_events] = file_df
151 self.current_length += file_nb_events
153 def data_check_new_file(self, hdf5_file_path: Path, png_dir_path: Path, plots_pdf_path: Path):
154 self.read_hdf5_data(hdf5_file_path)
156 if datetime.datetime.now() - self.last_plot_datetime > self.plot_refresh_interval_timedelta:
157 self.last_plot_datetime = datetime.datetime.now()
158 with self.data_lock:
159 self.plot(png_dir_path, plots_pdf_path)
161 def plot(self, png_dir_path: Path, plots_pdf_path: Path):
162 sns.set_theme()
164 png_dir_path.mkdir(exist_ok=True)
166 # legend order
167 legend_order = sorted(self.hdf_data.loc[:, "worker_node"].dtype.categories)
168 # masks
169 mask_event_quality = self.hdf_data["event_quality"] == 0
170 mask_is_good_event = self.hdf_data["is_good_event"] == 1
171 mask_future_events = self.hdf_data["trigger_time"] <= pd.to_datetime("21000101")
173 # TODO:
174 # - repair histogram plots with numpy_x fcts or numpy_y
175 # - apply masks on plots that have e31 values ...
176 # - find out why seaborn is so slow (do only histograms ?)
177 # - implement tests
179 with PdfPages(plots_pdf_path) as pdf_pages:
180 for plt_png_path, plt_conf in self.conf.plots_config.items():
181 fig, ax = plt.subplots(1, 1, figsize=(12, 8))
183 # select data based on masks and current length
184 fig_mask = np.ones((self.hdf_data.shape[0],), dtype=bool)
185 fig_mask[self.current_length :] = 0 # do not select anything over current length
186 if plt_conf.mask_event_quality:
187 fig_mask &= mask_event_quality
188 if plt_conf.mask_future_events:
189 fig_mask &= mask_future_events
190 if plt_conf.mask_is_good_event:
191 fig_mask &= mask_is_good_event
192 data = self.hdf_data.loc[fig_mask, :]
193 # Apply groupby operation if requested
194 if plt_conf.groupby is not None:
195 data = getattr(
196 data.groupby(
197 [
198 item if isinstance(item, str) else pd.Grouper(**item)
199 for item in plt_conf.groupby.groupby
200 ]
201 ),
202 plt_conf.groupby.computation[0],
203 **(plt_conf.groupby.kwargs if plt_conf.groupby.kwargs is not None else {}),
204 )(**(plt_conf.groupby.computation[1] if plt_conf.groupby.computation[1] is not None else {}))
206 # Apply function on x or y data if requested
207 for field, fct_conf in zip(["x", "y"], [plt_conf.np_function_x, plt_conf.np_function_y]):
208 if fct_conf is not None:
209 try:
210 plt_conf.kwargs[field] = getattr(np, fct_conf[0])(
211 data.loc[:, plt_conf.kwargs[field]],
212 **(fct_conf[1] if fct_conf[1] is not None else {}),
213 )
214 except AttributeError as e:
215 raise ValueError(
216 "Could not apply function {} with kwargs {} on {}".format(
217 fct_conf[0], fct_conf[1], plt_conf.kwargs[field]
218 )
219 ) from e
221 if plt_conf.kind == "plot":
222 plot_fct = sns.lineplot
223 # do not sort unless user requested it (lineplot sorted is quite confusing)
224 plt_conf.kwargs["sort"] = plt_conf.kwargs.get("sort", False)
225 plt_conf.kwargs["style"] = plt_conf.kwargs.get("style", "worker_node")
226 plt_conf.kwargs["style_order"] = plt_conf.kwargs.get("style_order", legend_order)
227 elif plt_conf.kind == "hist":
228 plot_fct = sns.histplot
229 plt_conf.kwargs["multiple"] = plt_conf.kwargs.get("multiple", "dodge")
230 # plt_conf.kwargs["align"] = plt_conf.kwargs.get("align", "center") # not supported by seaborn which already supplies the 'align' arg
231 plt_conf.kwargs["shrink"] = plt_conf.kwargs.get("shrink", 0.8)
232 else:
233 raise ValueError("Plot kind {} not supported".format(plt_conf.kind))
234 # todo: fix histo intensity
235 # fix xticks time: show second
236 # fix histogram column placement
237 plot_fct(
238 **plt_conf.kwargs,
239 data=data,
240 hue="worker_node",
241 hue_order=legend_order,
242 ax=ax,
243 )
244 if plt_conf.title is not None:
245 ax.set_title(plt_conf.title)
246 if plt_conf.xlabel is not None:
247 ax.set_xlabel(plt_conf.xlabel)
248 if plt_conf.ylabel is not None:
249 ax.set_ylabel(plt_conf.ylabel)
251 if len(ax.get_xticklabels()[0].get_text()) >= 6:
252 ax.set_xticks(ax.get_xticks(), ax.get_xticklabels(), rotation=45, ha="right")
254 fig.tight_layout()
255 fig.savefig(png_dir_path / plt_png_path, bbox_inches="tight")
256 pdf_pages.savefig(fig)
257 plt.close(fig)
259 def write_df_and_reset_data(self, data_dir: Path, df_write_path: Path):
260 self.hdf_data.loc[: self.current_length].to_hdf(df_write_path, key=self.conf.write_df_store_key, mode="w")
261 self.current_length = 0
264def main():
265 parser = ArgumentParser(
266 description="Data check plotting util of LST AUTO RTA: "
267 "run this entrypoint to reproduce offline the plots generated during the night",
268 formatter_class=ArgumentDefaultsHelpFormatter,
269 )
270 parser.add_argument(
271 "-c",
272 "--data_check_config",
273 type=str,
274 required=True,
275 dest="config",
276 help="Path to the data check configuration file",
277 )
278 parser.add_argument(
279 "-d",
280 "--data_dir",
281 type=str,
282 required=True,
283 dest="data_dir",
284 help="Path to the directory containing the data files, to run the data check on",
285 )
286 parser.add_argument(
287 "-n",
288 "--night_nodes",
289 type=str,
290 nargs="+",
291 dest="nodes",
292 help="List of worker nodes used during the night. Eg: cp12 cp15 cp42 cp58",
293 )
294 parser.add_argument(
295 "-p",
296 "--png_dir_path",
297 type=str,
298 required=True,
299 dest="png_dir_path",
300 help="Path to the directory where to write the plots as png images.",
301 )
302 parser.add_argument(
303 "-f", "--pdf_path", type=str, required=True, dest="pdf_path", help="Path to the output pdf with the plots."
304 )
305 args = parser.parse_args()
307 start_time = time.process_time()
308 with open(Path(args.config), "r") as config_f:
309 config = HDFDataCheckConfiguration.model_validate_json(config_f.read())
310 config_loaded_time = time.process_time()
311 print("Configuration loading time: {:.2f}s".format(config_loaded_time - start_time))
313 data_checker = HDFDataCheck(config, {i: n for i, n in enumerate(args.nodes)})
314 instantiation_time = time.process_time()
315 print("Instantiation time: {:.2f}s".format(instantiation_time - config_loaded_time))
317 file_list = [
318 p for p in Path(args.data_dir).glob("*") if p.is_file() and p.suffix == ".h5" and p.stem.startswith("dl1_v06")
319 ]
320 data_checker.read_hdf5_data(file_list[0])
321 init_and_load_first_file_time = time.process_time()
322 print("Big df init and load 1st file time: {:.2f}s".format(init_and_load_first_file_time - instantiation_time))
324 for data_file in tqdm(file_list[1:]):
325 data_checker.read_hdf5_data(data_file)
326 load_all_files_time = time.process_time()
327 print("Load remaining files: {:.2f}s".format(load_all_files_time - init_and_load_first_file_time))
329 data_checker.plot(Path(args.png_dir_path), Path(args.pdf_path))
330 plot_time = time.process_time()
331 print("Plot time: {:.2f}s".format(plot_time - load_all_files_time))
334if __name__ == "__main__":
335 main()