Coverage for lst_auto_rta/dl1_data_check.py: 0%
50 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-03 14:47 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-03 14:47 +0000
1import datetime
2from pathlib import Path
3from typing import Annotated, Dict, NamedTuple
5import matplotlib.pyplot as plt
6import numpy as np
7import pandas as pd
8import seaborn as sns
9from annotated_types import Ge, Gt
11from lst_auto_rta.utils.hdf5 import open_pytable_file_with_retry
14class FileProvenance(NamedTuple):
15 obs_id: Annotated[int, Gt(0)]
16 line_idx: Annotated[int, Ge(0)]
17 worker_node: str
20def parse_file_provenance(dl1_file_path: str, process_idx_to_nodelist: Dict[int, str]) -> FileProvenance:
21 # get the string after "..._id_", and split on "_" to get only this part of the file path.
22 obs_id = int(dl1_file_path.split("obs_id_", 1)[1].split("_", 1)[0])
23 line_idx = int(dl1_file_path.split("line_idx_", 1)[1].split("_", 1)[0])
24 return FileProvenance(obs_id=obs_id, line_idx=line_idx, worker_node=process_idx_to_nodelist[line_idx])
27def read_dl1_data(dl1_file_path: Path, dl1_params_path: str, nb_tries: int, retry_wait_time_s: float) -> pd.DataFrame:
28 with open_pytable_file_with_retry(
29 input_file=dl1_file_path,
30 mode="r",
31 nb_tries=nb_tries,
32 retry_wait_time=retry_wait_time_s,
33 retry_on_os_error=True,
34 ) as dl1_f:
35 params = dl1_f.get_node(dl1_params_path)[:]
37 return pd.DataFrame.from_records(params)
40def format_dl1_data(dl1_df: pd.DataFrame, obs_id: int, line_idx: int, worker_node: str) -> None:
41 # Some events have a trigger time "from the future" that mess with event_id_diff:
42 # set the event id diff of all of these future events and the following event as well to 0
43 # event_id_diffs[ndimage.binary_dilation(dl1_df["event_id"] > pd.to_datetime("21000101"), np.array([0, 1, 1]))] = 0
44 # Compute event id diffs:
45 # prepend 1st value to get diff = 0 for 1st elem
46 dl1_df["event_id_diff"] = np.diff(dl1_df["event_id"], prepend=dl1_df["event_id"][0])
47 dl1_df["obs_id"] = obs_id
48 dl1_df["line_idx"] = line_idx
49 dl1_df["worker_node"] = worker_node
52# TODO:
53# read table -> write data in big df. Load data in tables.Table, then copy col by col, then fill-in copied cols
54# https://stackoverflow.com/questions/25427197/numpy-how-to-add-a-column-to-an-existing-structured-array
56# - at each obs:
57# - where to write plots in pdf/png
58# - where to write df on dir change
61class DL1DataCheck:
62 def __init__(
63 self,
64 dl1_params_path: str,
65 plot_refresh_rate_s: float,
66 process_idx_to_nodelist: Dict[int, str],
67 hdf5_open_nb_retries: int,
68 hdf5_open_wait_time_s: float,
69 ):
70 self._dl1_params_path = dl1_params_path
71 self._last_plot_datetime = datetime.datetime.now()
72 self._plot_refresh_timedelta_threshold = datetime.timedelta(seconds=plot_refresh_rate_s)
73 self._process_idx_to_nodelist = process_idx_to_nodelist
74 self._hdf5_open_nb_retries = hdf5_open_nb_retries
75 self._hdf5_open_nb_retries = hdf5_open_wait_time_s
76 self._cumulative_dl1_data = []
78 self._plot_list = [
79 "event_id_vs_trigger_time",
80 "event_id_diff",
81 "trigger_time",
82 "is_good_event",
83 "event_type",
84 "event_quality_vs_total_intensity",
85 "event_quality_vs_intensity",
86 "event_quality",
87 "az_tel_vs_trigger_time",
88 "alt_tel_vs_trigger_time",
89 "x"
90 "y"
91 "phi"
92 "width"
93 "length"
94 "intensity"
95 "n_pixels"
96 "skewness"
97 "r"
98 "kurtosis"
99 "psi"
100 "time_gradient"
101 "intercept"
102 "leakage_pixels_width_1"
103 "leakage_pixels_width_2"
104 "leakage_intensity_width_1"
105 "leakage_intensity_width_2"
106 "alt_tel"
107 "az_tel"
108 "trigger_time"
109 "wl",
110 ]
111 self._fig_axs = {plot_id: plt.subplots(1, 1) for plot_id in self._plot_list}
113 def data_check_new_file(self, dl1_file_path: Path, dl1_plots_png_path: Path, dl1_plots_pdf: Path):
114 file_provenance = parse_file_provenance(str(dl1_file_path))
115 self._cumulative_dl1_data.append(
116 format_dl1_data(
117 read_dl1_data(
118 sl1_file_path=dl1_file_path,
119 dl1_params_path=self._dl1_params_path,
120 nb_tries=self._hdf5_open_nb_retries,
121 retry_wait_time_s=self._hdf5_open_nb_retries,
122 ),
123 obs_id=file_provenance.obs_id,
124 line_idx=file_provenance.line_idx,
125 worker_node=file_provenance.worker_node,
126 )
127 )
128 if datetime.datetime.now() - self._last_plot_datetime > self._plot_refresh_timedelta_threshold:
129 self._last_plot_datetime = datetime.datetime.now()
130 self.update_plots(self._cumulative_dl1_data[-1])
132 def update_plots(self, dl1_df: pd.DataFrame, dl1_plots_png_path: Path, dl1_plots_pdf: Path):
133 sns.set_theme()
134 if not self._figures:
135 self.init_figures(dl1_df)
137 # these have special plots
138 special_plots = [
139 "event_id_diff",
140 "trigger_time",
141 "event_id",
142 "event_type",
143 "total_intensity",
144 "intensity",
145 "event_quality",
146 "az_tel",
147 "alt_tel",
148 ]
150 def write_df_and_reset_data(self, data_dir: Path, df_write_path: Path):
151 # TODO: write df
152 # self._cumulative_dl1_data = [pd.concat(self._cumulative_dl1_data, axis=0, ignore_index=True)]
153 self._dl1_data = []