Coverage for lst_auto_rta/plot_line_delay.py: 0%
136 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-03 14:47 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-03 14:47 +0000
1"""Script plotting event_ids differences and r0dl1 rates for all observations of a night."""
3import argparse
4import logging
5import pickle
6import re
7import subprocess as sp
8import sys
9import time
10from multiprocessing import Pool
11from pathlib import Path
13import numpy as np
14import pandas as pd
15import seaborn as sns
16import tables
17from matplotlib import pyplot as plt
18from scipy import ndimage
19from tqdm import tqdm
22def log_uncaught_exceptions():
23 """Makes all uncaught exception to be logged by the default logger.
25 Keyboard exceptions and children classes are not logged so one can kill the program with ctr+C.
26 """
28 def handle_exception(exc_type, exc_value, exc_traceback):
29 if not issubclass(exc_type, KeyboardInterrupt):
30 logging.critical("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))
32 sys.__excepthook__(exc_type, exc_value, exc_traceback)
33 return
35 sys.excepthook = handle_exception
38def find_nigth_run_dirs(night_path):
39 """Find the observation folders in a night's directory.
41 In LST, a night directory contains folders for each observation, plus a calibration and a logs folder.
43 Parameters
44 ----------
45 night_path : pahtlib.Path
46 Path to the night's data folder containing all subruns.
48 Returns
49 -------
50 List of Paths to all the obersvation folders in the night's directory.
51 """
52 return [f for f in night_path.iterdir() if f.is_dir() and "calibration" not in f.stem and "logs" not in f.stem]
55def parse_run_worker_nodes(run_data_path):
56 """Parse the worker nodes of the R0->DL1 jobs during in an observation from hiperta_stream_start log file.
58 hiperta_stream_start log file is parsed to read the slurm job ids of the R0->DL1 jobs. Then
59 the worker nodes names are found by interogating sacct.
61 Paramters
62 ---------
63 run_data_path : pathlib.Path
64 Path to the observation folder, containing the dl1, dl2 and dl3 directories and the hiperta_stream_start logs.
65 Returns
66 -------
67 List of string: the names of the worker nodes, in the order of the lines.
68 """
69 # job id file template: "jobids_hiperta_stream_{hiperta_start_timestamp}.txt"
70 # timestamp format: 2023-11-23_05-45-51
71 # There is another one for altaz job, not to be selected.
72 job_id_files = [f for f in run_data_path.iterdir() if f.is_file() and "jobids" in f.stem and "altaz" not in f.stem]
73 if len(job_id_files) != 1:
74 raise ValueError(
75 "Couldn't find jobid file in run dir {}, got {}".format(str(run_data_path), str(job_id_files))
76 )
78 with open(job_id_files[0], "r") as job_file:
79 job_ids = job_file.read().strip()
81 sacct_process = sp.run(["sacct", "-j", job_ids, "--format", "NodeList"], capture_output=True, check=True)
83 return [node_str.strip() for node_str in sacct_process.stdout.decode().split("\n")[2:]][:-1:2]
86def parse_line_subrun_dl1_events(
87 line_subrun_dl1_filename, run_id, line_idx, worker_node, nb_retries, re_try_wait_time
88):
89 """Read a DL1 file, to load the timestamps and event id information.
91 The separated timestamps_s and timestamp_qns are put back together to single datetime object (ns precision).
92 The event_id diffs is also computed here, as the difference between the event id of an event with respect to
93 the previous one. The first event is set to have an event_id_diff of 0. The "events from the future" are
94 set a event_id_diff of 0, as well as the first event following an event from the future (because event ids
95 of the event from the future are weird).
97 This function returns None if the file could not be read.
99 Parameters
100 ----------
101 line_subrun_dl1_filename : pathlib.Path
102 Path to the dl1 file.
103 run_id : int
104 Run ID of the events in the file. Written as field in the df.
105 line_idx : int
106 Line number of the process that wrote this file. Written as field in the df.
107 worker_node : str
108 Name of the worker node that processed the DL1 file to treat.
109 nb_retries : int
110 Number of times to re-try opening a file in case of error.
111 re_try_wait_time : float
112 Amount of time, in seconds, to wait before attempting to open a file again.
114 Returns
115 -------
116 Pandas dataframe with the subrun event ids and timestamps (and line index, worker node).
117 """
118 # re-try mechanism because of fefs, sometimes we get errors even if the file is there.
119 for _ in range(nb_retries):
120 try:
121 with tables.open_file(line_subrun_dl1_filename, "r") as dl1_f:
122 event_ids = dl1_f.root.dl1.event.subarray.trigger.cols.event_id[:]
123 timestamp_s = dl1_f.root.dl1.event.subarray.trigger.cols.timestamp_s[:]
124 timestamp_qns = dl1_f.root.dl1.event.subarray.trigger.cols.timestamp_qns[:]
125 break
126 except Exception:
127 time.sleep(re_try_wait_time)
128 else:
129 logging.warning("Could not open file {} !".format(line_subrun_dl1_filename))
130 return None
132 # Conver to a single datetime object.
133 trigger_time = (pd.to_datetime(timestamp_s, unit="s") + pd.to_timedelta(timestamp_qns // 4, unit="ns")).to_series()
134 # "Future events" have weird event ids, so set their event id diffs to 0
135 # Also set the event id diff of the next event after a future event to 0 for same reason
136 event_id_diffs = np.diff(event_ids, prepend=event_ids[0])
137 event_id_diffs[ndimage.binary_dilation(trigger_time > pd.to_datetime("21000101"), np.array([0, 1, 1]))] = 0
139 return pd.DataFrame(
140 {
141 "timestamp_s": timestamp_s,
142 "timestamp_qns": timestamp_qns,
143 "trigger_time": trigger_time,
144 "event_id": event_ids,
145 "event_id_diffs": event_id_diffs,
146 "run_id": run_id,
147 "line_idx": line_idx,
148 "worker_node": worker_node,
149 }
150 )
153def parse_run_dl1_events(dl1_dir_path, run_id, worker_nodes, nb_process, nb_retries, re_try_wait_time):
154 """Parse an observation DL1 files to load event ids and timestamps in a dataframe
156 Paramters
157 ---------
158 dl1_dir_path : pathlib.Path
159 Path to the folder containing the night's DL1 files. (all runs are in the same folder)
160 run_id : int
161 Run id of the run to analyse
162 worker_nodes : list of str
163 List of the worker nodes names of the R0->DL1 jobs of this run
164 nb_process : int
165 Number of processs to use to parse the runs files.
166 nb_retries : int
167 Number of times to re-try reading a DL1 file.
168 re_try_wait_time : float
169 Amount of time, in seconds, to wait before re-trying to open a file.
171 Returns
172 -------
173 List of pandas dataframe with the events id and timestamps data per subrun.
174 """
175 files_dfs = []
176 for line_idx in [0, 1, 2, 3]:
177 with Pool(nb_process) as pool:
178 line_dfs = pool.starmap(
179 parse_line_subrun_dl1_events,
180 [
181 (
182 line_subrun_filename,
183 run_id,
184 line_idx,
185 worker_nodes[line_idx],
186 nb_retries,
187 re_try_wait_time,
188 )
189 # For files in LST created by LSTAutoRTA:
190 # DL1 filename template: "dl1_${obs_id}_${run_id}_${tel_id}_${processIndex}_${threadIndex}_${fileIndex}.h5"
191 # obs_id = 0, tel_id = 1, threadIndex = 0 in LST when started by autoRTA.
192 for line_subrun_filename in dl1_dir_path.glob("dl1_0_{}_1_{}*.h5".format(run_id, line_idx))
193 ],
194 )
196 files_dfs.extend([df for df in line_dfs if df is not None])
198 return files_dfs
201def parse_run_log_files(logs_dir, run_id, worker_nodes):
202 """Parse the log files of the R0->DL1 process of a run to read the processing rates.
204 Parameters
205 ----------
206 logs_dir : pathlib.Path
207 Path to the folder containing the R0->DL1 jobs logs.
208 run_id : int
209 Run id of the run to parse
210 worker_nodes : list of str
211 List of the worker nodes names of the R0->DL1 jobs of this run
213 Returns
214 -------
215 List of pandas dataframe with the R0->DL1 processing rates per line during the run.
216 """
218 pattern = re.compile(
219 r"\[([0-9]+)\] : update_timer_message : "
220 r"nbBadMessage = ([0-9]+) \(size in \[[^\]]*\]\), "
221 r"nbEvent = ([0-9]+), "
222 r"nbGoodEvent = ([0-9]+)"
223 )
224 logs_dfs = []
225 for line_idx in [0, 1, 2, 3]:
226 # log filename template: "r0_dl1_{run_id}_{line_id}.log"
227 for log_file in logs_dir.glob("r0_dl1_{}_{}.log".format(run_id, line_idx)):
228 timestamps = []
229 nb_events = []
230 nb_good_events = []
231 event_per_sec = []
232 good_event_per_s = []
233 with open(log_file, "r") as log_f:
234 for log_line in log_f.readlines():
235 results = pattern.search(log_line)
236 if results:
237 timestamps.append(int(results.group(1)))
238 nb_events.append(int(results.group(3)))
239 nb_good_events.append(int(results.group(4)))
241 logs_dfs.append(
242 pd.DataFrame(
243 {
244 "timestamp": pd.to_datetime(timestamps, unit="s"),
245 "nbEvent": nb_events,
246 "run_id": run_id,
247 "line_idx": line_idx,
248 "worker_node": worker_nodes[line_idx],
249 }
250 )
251 )
253 return logs_dfs
256def save_line_delays_plot(event_df, log_df, fig_pickle_path, rate_aggregation_str):
257 """Plot and save the event id and the R0->DL1 processing rates wrt to time for an entire night
259 Paramters
260 ---------
261 event_df : pd.DataFrame
262 DataFrame with events data.
263 log_df : pd.DataFrame
264 DataFrame with log files data (processing rate)
265 fig_pickle_path : pathlib.Path
266 Path to where the plot figure should be pickled.
267 rate_aggregation_str : str
268 Aggregation string in pandas time grouper syntax. For example for 10 seconds: "10S"
269 """
271 sns.set_theme()
272 worker_nodes = sorted(log_df["worker_node"].unique().tolist())
273 run_ids = sorted(log_df["run_id"].unique())
274 run_starts = [log_df.loc[log_df["run_id"] == run_id, "timestamp"].min() for run_id in run_ids]
275 fig, ax = plt.subplots(
276 2,
277 1,
278 figsize=(15, 10),
279 sharex=True,
280 )
281 sns.scatterplot(
282 x="trigger_time",
283 y="event_id_diffs",
284 hue="worker_node",
285 style="worker_node",
286 data=event_df[
287 (event_df["event_id_diffs"] != 4) # don't plot events with normal event id diffs (too many of them)
288 & (
289 event_df["event_id_diffs"] != 0
290 ) # those are: 1st events in subrun line file, or events after future events.
291 & (event_df["trigger_time"] < pd.to_datetime("21000101")) # don't plot events from the future
292 & (event_df["event_id_diffs"] < 1e18) # huge event ids when lines caught events of start of next run
293 ],
294 ax=ax[0],
295 s=12,
296 style_order=worker_nodes,
297 hue_order=worker_nodes,
298 )
299 ax[0].set_yscale("log")
300 sns.lineplot(
301 x="timestamp",
302 y="nbEvent",
303 data=log_df.groupby([pd.Grouper(key="timestamp", freq=rate_aggregation_str), "worker_node"]).mean(),
304 hue="worker_node",
305 style="worker_node",
306 units="run_id",
307 markers=True,
308 dashes=False,
309 markersize=4,
310 linewidth=1,
311 sort=False,
312 ax=ax[1],
313 hue_order=worker_nodes,
314 markeredgewidth=0.0,
315 )
316 for run_id, run_start in zip(run_ids, run_starts):
317 ax[1].annotate(
318 str(run_id),
319 (run_start, 0),
320 xytext=(0, 2),
321 textcoords="offset fontsize",
322 arrowprops=dict(arrowstyle="-|>", color="k"),
323 fontsize="xx-small",
324 )
325 fig.tight_layout()
327 with open(fig_pickle_path, "wb") as plot_fig_pickle:
328 pickle.dump(fig, plot_fig_pickle)
331def process_night(
332 night,
333 nb_process,
334 force_reload,
335 data_base_path,
336 obs_relative_dl1_folder,
337 obs_relative_data_folder,
338 relative_logs_folder,
339 output_file_prefix,
340 rate_aggregation_str,
341 nb_retries,
342 re_try_wait_time,
343):
344 """Parses DL1 and log files of an entire night to plot the event ids and processing rates wrt time
346 Parameters
347 ----------
348 night : str
349 Night data in format "YYYYMMDD"
350 nb_process : int
351 Number of process to use to parse the files
352 force_reload : bool
353 If true, the data and logs files are re-processed even if the dataframe is already saved
354 at the output location.
355 data_base_path : pathlib.Path
356 Path to the the folder containing the nights directories
357 obs_relative_dl1_folder : pathlib.Path
358 Relative path to the folder containing dl1 files from the night directory
359 obs_relative_data_folder : pathlib.Path
360 Relative path to the folder containing the data (dl1, dl2, dl3) subfolder from the night directory.
361 relative_logs_folder : pathlib.Path
362 Relative path to the folder containing the night's logs from the night repository
363 output_file_prefix : str
364 Prefix to prepend to the DataFrame and Figure to save. The night str will be appended to
365 make the full output file names.
366 rate_aggregation_str : str
367 Aggregation string in pandas time grouper syntax. For example for 10 seconds: "10S"
368 nb_retries : int
369 Number of times to re-try reading a DL1 file.
370 re_try_wait_time : float
371 Amount of time, in seconds, to wait before re-trying to open a file.
372 """
373 night_dir = data_base_path / night
375 night_df_path = night_dir / Path(f"{output_file_prefix}{night}.h5")
376 if night_df_path.exists() and not force_reload:
377 event_df = pd.read_hdf(night_df_path, key="event_df", mode="r")
378 log_df = pd.read_hdf(night_df_path, key="log_df", mode="r")
379 else:
380 events_dfs = []
381 logs_dfs = []
383 for run_dir_data_path in tqdm(find_nigth_run_dirs(night_dir)):
384 try:
385 run_id = int(run_dir_data_path.stem)
386 worker_nodes = parse_run_worker_nodes(run_dir_data_path / obs_relative_data_folder)
388 events_dfs.extend(
389 parse_run_dl1_events(
390 run_dir_data_path / obs_relative_dl1_folder,
391 run_id,
392 worker_nodes,
393 nb_process,
394 nb_retries,
395 re_try_wait_time,
396 )
397 )
398 logs_dfs.extend(parse_run_log_files(night_dir / relative_logs_folder, run_id, worker_nodes))
399 except Exception:
400 logging.error(f"Can not process run {run_dir_data_path}", exc_info=True)
402 event_df = pd.concat(events_dfs, ignore_index=True)
403 log_df = pd.concat(logs_dfs, ignore_index=True)
405 event_df.to_hdf(night_df_path, key="event_df", mode="w")
406 log_df.to_hdf(night_df_path, key="log_df", mode="a")
408 save_line_delays_plot(event_df, log_df, night_dir / f"{output_file_prefix}{night}.pickle", rate_aggregation_str)
411def main():
412 log_uncaught_exceptions()
414 parser = argparse.ArgumentParser(
415 prog="plot_line_delays.py",
416 description="Plotting script to visualize delays in events per line. "
417 "This script will first build two pandas dataframes while loading the night's data, "
418 "and save it to the night's data folder (unless a df is already saved there). "
419 "It will then plot the lines unwanted event ids and pickle the figure to the night's data as well. "
420 "To view the figure: import pickle; figx = pickle.load(open('Figure.pickle', 'rb')); figx.show()",
421 epilog="Suitable environment for this script: conda create -n visu -c conda-forge pytables pandas seaborn",
422 formatter_class=argparse.ArgumentDefaultsHelpFormatter,
423 )
424 parser.add_argument("--night", "-n", type=str, required=True, help='Night to plot, in format "YYYYMMDD".')
425 parser.add_argument(
426 "--nb_process", "-p", type=int, default=1, help="Number of process to use to aggregate the night's data."
427 )
428 parser.add_argument(
429 "--force_reload",
430 "-f",
431 action="store_true",
432 default=False,
433 help="Forces the re-parsing of the night's data "
434 "even if a visualization dataframe is already saved in the night folder.",
435 )
436 parser.add_argument(
437 "--data_base_path",
438 "-d",
439 type=str,
440 default="/fefs/onsite/pipeline/rta/data/",
441 help="Path to the folder containing the reco nights outputs.",
442 )
443 parser.add_argument(
444 "--obs_relative_dl1_folder",
445 "-fdl1",
446 type=str,
447 default="reco/dl1/",
448 help="Path to the folder containing the DL1 hdf5 files, relative to the observation folder.",
449 )
450 parser.add_argument(
451 "--obs_relative_data_folder",
452 "-fdata",
453 type=str,
454 default="reco/",
455 help="Path to the top level data folder for an observation, relative to the observation folder. "
456 "It is used to find the hiperta_stream_start log of the slurm jobs.",
457 )
458 parser.add_argument(
459 "--relative_logs_folder",
460 "-flogs",
461 type=str,
462 default="logs/RECO/r0_dl1",
463 help="Path to the folder containing the r0->dl1 logs, relative to the night folder."
464 "(All observations logs of a night are stored together in LST.)",
465 )
466 parser.add_argument(
467 "--output_file_prefix",
468 "-o",
469 type=str,
470 default="visu_delay_",
471 help="Prefix string to prepend to the dataframe and figure file. "
472 "To this prefix will be appended the night string.",
473 )
474 parser.add_argument(
475 "--rate_aggregation_str",
476 "-a",
477 type=str,
478 default="1S",
479 help="Time range to use to aggregate rate values in plot, in pandas grouper syntax. "
480 "For instance for 10 seconds: '10S'. No grouping is '1S'. This can be ",
481 )
482 parser.add_argument(
483 "--nb_retries",
484 "-r",
485 type=int,
486 default=3,
487 help="Number of time to re-try opening a file (in case it fails due to fefs).",
488 )
489 parser.add_argument(
490 "--re_try_wait_time",
491 "-t",
492 type=float,
493 default=1.0,
494 help="Amount of time to wait before re-trying to open a data file.",
495 )
496 parser.add_argument(
497 "--log_file",
498 "-l",
499 type=str,
500 help="Path to the file to write this scipt logs. If not provided, logs are printed to the console.",
501 )
502 args = parser.parse_args()
504 if args.log_file is not None:
505 logging.basicConfig(filename=args.log_file)
507 data_base_path = Path(args.data_base_path)
508 obs_relative_dl1_folder = Path(args.obs_relative_dl1_folder)
509 obs_relative_data_folder = Path(args.obs_relative_data_folder)
510 relative_logs_folder = Path(args.relative_logs_folder)
512 nb_process = args.nb_process if args.nb_process >= 1 else 1
514 process_night(
515 night=args.night,
516 nb_process=nb_process,
517 force_reload=args.force_reload,
518 data_base_path=data_base_path,
519 obs_relative_dl1_folder=obs_relative_dl1_folder,
520 obs_relative_data_folder=obs_relative_data_folder,
521 relative_logs_folder=relative_logs_folder,
522 output_file_prefix=args.output_file_prefix,
523 rate_aggregation_str=args.rate_aggregation_str,
524 nb_retries=args.nb_retries,
525 re_try_wait_time=args.re_try_wait_time,
526 )
529if __name__ == "__main__":
530 main()