Coverage for lst_auto_rta/dl1_data_check.py: 0%

50 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-03 14:47 +0000

1import datetime 

2from pathlib import Path 

3from typing import Annotated, Dict, NamedTuple 

4 

5import matplotlib.pyplot as plt 

6import numpy as np 

7import pandas as pd 

8import seaborn as sns 

9from annotated_types import Ge, Gt 

10 

11from lst_auto_rta.utils.hdf5 import open_pytable_file_with_retry 

12 

13 

14class FileProvenance(NamedTuple): 

15 obs_id: Annotated[int, Gt(0)] 

16 line_idx: Annotated[int, Ge(0)] 

17 worker_node: str 

18 

19 

20def parse_file_provenance(dl1_file_path: str, process_idx_to_nodelist: Dict[int, str]) -> FileProvenance: 

21 # get the string after "..._id_", and split on "_" to get only this part of the file path. 

22 obs_id = int(dl1_file_path.split("obs_id_", 1)[1].split("_", 1)[0]) 

23 line_idx = int(dl1_file_path.split("line_idx_", 1)[1].split("_", 1)[0]) 

24 return FileProvenance(obs_id=obs_id, line_idx=line_idx, worker_node=process_idx_to_nodelist[line_idx]) 

25 

26 

27def read_dl1_data(dl1_file_path: Path, dl1_params_path: str, nb_tries: int, retry_wait_time_s: float) -> pd.DataFrame: 

28 with open_pytable_file_with_retry( 

29 input_file=dl1_file_path, 

30 mode="r", 

31 nb_tries=nb_tries, 

32 retry_wait_time=retry_wait_time_s, 

33 retry_on_os_error=True, 

34 ) as dl1_f: 

35 params = dl1_f.get_node(dl1_params_path)[:] 

36 

37 return pd.DataFrame.from_records(params) 

38 

39 

40def format_dl1_data(dl1_df: pd.DataFrame, obs_id: int, line_idx: int, worker_node: str) -> None: 

41 # Some events have a trigger time "from the future" that mess with event_id_diff: 

42 # set the event id diff of all of these future events and the following event as well to 0 

43 # event_id_diffs[ndimage.binary_dilation(dl1_df["event_id"] > pd.to_datetime("21000101"), np.array([0, 1, 1]))] = 0 

44 # Compute event id diffs: 

45 # prepend 1st value to get diff = 0 for 1st elem 

46 dl1_df["event_id_diff"] = np.diff(dl1_df["event_id"], prepend=dl1_df["event_id"][0]) 

47 dl1_df["obs_id"] = obs_id 

48 dl1_df["line_idx"] = line_idx 

49 dl1_df["worker_node"] = worker_node 

50 

51 

52# TODO: 

53# read table -> write data in big df. Load data in tables.Table, then copy col by col, then fill-in copied cols 

54# https://stackoverflow.com/questions/25427197/numpy-how-to-add-a-column-to-an-existing-structured-array 

55 

56# - at each obs: 

57# - where to write plots in pdf/png 

58# - where to write df on dir change 

59 

60 

61class DL1DataCheck: 

62 def __init__( 

63 self, 

64 dl1_params_path: str, 

65 plot_refresh_rate_s: float, 

66 process_idx_to_nodelist: Dict[int, str], 

67 hdf5_open_nb_retries: int, 

68 hdf5_open_wait_time_s: float, 

69 ): 

70 self._dl1_params_path = dl1_params_path 

71 self._last_plot_datetime = datetime.datetime.now() 

72 self._plot_refresh_timedelta_threshold = datetime.timedelta(seconds=plot_refresh_rate_s) 

73 self._process_idx_to_nodelist = process_idx_to_nodelist 

74 self._hdf5_open_nb_retries = hdf5_open_nb_retries 

75 self._hdf5_open_nb_retries = hdf5_open_wait_time_s 

76 self._cumulative_dl1_data = [] 

77 

78 self._plot_list = [ 

79 "event_id_vs_trigger_time", 

80 "event_id_diff", 

81 "trigger_time", 

82 "is_good_event", 

83 "event_type", 

84 "event_quality_vs_total_intensity", 

85 "event_quality_vs_intensity", 

86 "event_quality", 

87 "az_tel_vs_trigger_time", 

88 "alt_tel_vs_trigger_time", 

89 "x" 

90 "y" 

91 "phi" 

92 "width" 

93 "length" 

94 "intensity" 

95 "n_pixels" 

96 "skewness" 

97 "r" 

98 "kurtosis" 

99 "psi" 

100 "time_gradient" 

101 "intercept" 

102 "leakage_pixels_width_1" 

103 "leakage_pixels_width_2" 

104 "leakage_intensity_width_1" 

105 "leakage_intensity_width_2" 

106 "alt_tel" 

107 "az_tel" 

108 "trigger_time" 

109 "wl", 

110 ] 

111 self._fig_axs = {plot_id: plt.subplots(1, 1) for plot_id in self._plot_list} 

112 

113 def data_check_new_file(self, dl1_file_path: Path, dl1_plots_png_path: Path, dl1_plots_pdf: Path): 

114 file_provenance = parse_file_provenance(str(dl1_file_path)) 

115 self._cumulative_dl1_data.append( 

116 format_dl1_data( 

117 read_dl1_data( 

118 sl1_file_path=dl1_file_path, 

119 dl1_params_path=self._dl1_params_path, 

120 nb_tries=self._hdf5_open_nb_retries, 

121 retry_wait_time_s=self._hdf5_open_nb_retries, 

122 ), 

123 obs_id=file_provenance.obs_id, 

124 line_idx=file_provenance.line_idx, 

125 worker_node=file_provenance.worker_node, 

126 ) 

127 ) 

128 if datetime.datetime.now() - self._last_plot_datetime > self._plot_refresh_timedelta_threshold: 

129 self._last_plot_datetime = datetime.datetime.now() 

130 self.update_plots(self._cumulative_dl1_data[-1]) 

131 

132 def update_plots(self, dl1_df: pd.DataFrame, dl1_plots_png_path: Path, dl1_plots_pdf: Path): 

133 sns.set_theme() 

134 if not self._figures: 

135 self.init_figures(dl1_df) 

136 

137 # these have special plots 

138 special_plots = [ 

139 "event_id_diff", 

140 "trigger_time", 

141 "event_id", 

142 "event_type", 

143 "total_intensity", 

144 "intensity", 

145 "event_quality", 

146 "az_tel", 

147 "alt_tel", 

148 ] 

149 

150 def write_df_and_reset_data(self, data_dir: Path, df_write_path: Path): 

151 # TODO: write df 

152 # self._cumulative_dl1_data = [pd.concat(self._cumulative_dl1_data, axis=0, ignore_index=True)] 

153 self._dl1_data = []