"""Script plotting event_ids differences and r0dl1 rates for all observations of a night."""
import argparse
import logging
import pickle
import re
import subprocess as sp
import sys
import time
from multiprocessing import Pool
from pathlib import Path
import numpy as np
import pandas as pd
import seaborn as sns
import tables
from matplotlib import pyplot as plt
from scipy import ndimage
from tqdm import tqdm
[docs]
def log_uncaught_exceptions():
"""Makes all uncaught exception to be logged by the default logger.
Keyboard exceptions and children classes are not logged so one can kill the program with ctr+C.
"""
def handle_exception(exc_type, exc_value, exc_traceback):
if not issubclass(exc_type, KeyboardInterrupt):
logging.critical("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
sys.excepthook = handle_exception
[docs]
def find_nigth_run_dirs(night_path):
"""Find the observation folders in a night's directory.
In LST, a night directory contains folders for each observation, plus a calibration and a logs folder.
Parameters
----------
night_path : pahtlib.Path
Path to the night's data folder containing all subruns.
Returns
-------
List of Paths to all the obersvation folders in the night's directory.
"""
return [f for f in night_path.iterdir() if f.is_dir() and "calibration" not in f.stem and "logs" not in f.stem]
[docs]
def parse_run_worker_nodes(run_data_path):
"""Parse the worker nodes of the R0->DL1 jobs during in an observation from hiperta_stream_start log file.
hiperta_stream_start log file is parsed to read the slurm job ids of the R0->DL1 jobs. Then
the worker nodes names are found by interogating sacct.
Paramters
---------
run_data_path : pathlib.Path
Path to the observation folder, containing the dl1, dl2 and dl3 directories and the hiperta_stream_start logs.
Returns
-------
List of string: the names of the worker nodes, in the order of the lines.
"""
# job id file template: "jobids_hiperta_stream_{hiperta_start_timestamp}.txt"
# timestamp format: 2023-11-23_05-45-51
# There is another one for altaz job, not to be selected.
job_id_files = [f for f in run_data_path.iterdir() if f.is_file() and "jobids" in f.stem and "altaz" not in f.stem]
if len(job_id_files) != 1:
raise ValueError(
"Couldn't find jobid file in run dir {}, got {}".format(str(run_data_path), str(job_id_files))
)
with open(job_id_files[0], "r") as job_file:
job_ids = job_file.read().strip()
sacct_process = sp.run(["sacct", "-j", job_ids, "--format", "NodeList"], capture_output=True, check=True)
return [node_str.strip() for node_str in sacct_process.stdout.decode().split("\n")[2:]][:-1:2]
[docs]
def parse_line_subrun_dl1_events(
line_subrun_dl1_filename, run_id, line_idx, worker_node, nb_retries, re_try_wait_time
):
"""Read a DL1 file, to load the timestamps and event id information.
The separated timestamps_s and timestamp_qns are put back together to single datetime object (ns precision).
The event_id diffs is also computed here, as the difference between the event id of an event with respect to
the previous one. The first event is set to have an event_id_diff of 0. The "events from the future" are
set a event_id_diff of 0, as well as the first event following an event from the future (because event ids
of the event from the future are weird).
This function returns None if the file could not be read.
Parameters
----------
line_subrun_dl1_filename : pathlib.Path
Path to the dl1 file.
run_id : int
Run ID of the events in the file. Written as field in the df.
line_idx : int
Line number of the process that wrote this file. Written as field in the df.
worker_node : str
Name of the worker node that processed the DL1 file to treat.
nb_retries : int
Number of times to re-try opening a file in case of error.
re_try_wait_time : float
Amount of time, in seconds, to wait before attempting to open a file again.
Returns
-------
Pandas dataframe with the subrun event ids and timestamps (and line index, worker node).
"""
# re-try mechanism because of fefs, sometimes we get errors even if the file is there.
for _ in range(nb_retries):
try:
with tables.open_file(line_subrun_dl1_filename, "r") as dl1_f:
event_ids = dl1_f.root.dl1.event.subarray.trigger.cols.event_id[:]
timestamp_s = dl1_f.root.dl1.event.subarray.trigger.cols.timestamp_s[:]
timestamp_qns = dl1_f.root.dl1.event.subarray.trigger.cols.timestamp_qns[:]
break
except Exception:
time.sleep(re_try_wait_time)
else:
logging.warning("Could not open file {} !".format(line_subrun_dl1_filename))
return None
# Conver to a single datetime object.
trigger_time = (pd.to_datetime(timestamp_s, unit="s") + pd.to_timedelta(timestamp_qns // 4, unit="ns")).to_series()
# "Future events" have weird event ids, so set their event id diffs to 0
# Also set the event id diff of the next event after a future event to 0 for same reason
event_id_diffs = np.diff(event_ids, prepend=event_ids[0])
event_id_diffs[ndimage.binary_dilation(trigger_time > pd.to_datetime("21000101"), np.array([0, 1, 1]))] = 0
return pd.DataFrame(
{
"timestamp_s": timestamp_s,
"timestamp_qns": timestamp_qns,
"trigger_time": trigger_time,
"event_id": event_ids,
"event_id_diffs": event_id_diffs,
"run_id": run_id,
"line_idx": line_idx,
"worker_node": worker_node,
}
)
[docs]
def parse_run_dl1_events(dl1_dir_path, run_id, worker_nodes, nb_process, nb_retries, re_try_wait_time):
"""Parse an observation DL1 files to load event ids and timestamps in a dataframe
Paramters
---------
dl1_dir_path : pathlib.Path
Path to the folder containing the night's DL1 files. (all runs are in the same folder)
run_id : int
Run id of the run to analyse
worker_nodes : list of str
List of the worker nodes names of the R0->DL1 jobs of this run
nb_process : int
Number of processs to use to parse the runs files.
nb_retries : int
Number of times to re-try reading a DL1 file.
re_try_wait_time : float
Amount of time, in seconds, to wait before re-trying to open a file.
Returns
-------
List of pandas dataframe with the events id and timestamps data per subrun.
"""
files_dfs = []
for line_idx in [0, 1, 2, 3]:
with Pool(nb_process) as pool:
line_dfs = pool.starmap(
parse_line_subrun_dl1_events,
[
(
line_subrun_filename,
run_id,
line_idx,
worker_nodes[line_idx],
nb_retries,
re_try_wait_time,
)
# For files in LST created by LSTAutoRTA:
# DL1 filename template: "dl1_${obs_id}_${run_id}_${tel_id}_${processIndex}_${threadIndex}_${fileIndex}.h5"
# obs_id = 0, tel_id = 1, threadIndex = 0 in LST when started by autoRTA.
for line_subrun_filename in dl1_dir_path.glob("dl1_0_{}_1_{}*.h5".format(run_id, line_idx))
],
)
files_dfs.extend([df for df in line_dfs if df is not None])
return files_dfs
[docs]
def parse_run_log_files(logs_dir, run_id, worker_nodes):
"""Parse the log files of the R0->DL1 process of a run to read the processing rates.
Parameters
----------
logs_dir : pathlib.Path
Path to the folder containing the R0->DL1 jobs logs.
run_id : int
Run id of the run to parse
worker_nodes : list of str
List of the worker nodes names of the R0->DL1 jobs of this run
Returns
-------
List of pandas dataframe with the R0->DL1 processing rates per line during the run.
"""
pattern = re.compile(
r"\[([0-9]+)\] : update_timer_message : "
r"nbBadMessage = ([0-9]+) \(size in \[[^\]]*\]\), "
r"nbEvent = ([0-9]+), "
r"nbGoodEvent = ([0-9]+)"
)
logs_dfs = []
for line_idx in [0, 1, 2, 3]:
# log filename template: "r0_dl1_{run_id}_{line_id}.log"
for log_file in logs_dir.glob("r0_dl1_{}_{}.log".format(run_id, line_idx)):
timestamps = []
nb_events = []
nb_good_events = []
event_per_sec = []
good_event_per_s = []
with open(log_file, "r") as log_f:
for log_line in log_f.readlines():
results = pattern.search(log_line)
if results:
timestamps.append(int(results.group(1)))
nb_events.append(int(results.group(3)))
nb_good_events.append(int(results.group(4)))
logs_dfs.append(
pd.DataFrame(
{
"timestamp": pd.to_datetime(timestamps, unit="s"),
"nbEvent": nb_events,
"run_id": run_id,
"line_idx": line_idx,
"worker_node": worker_nodes[line_idx],
}
)
)
return logs_dfs
[docs]
def save_line_delays_plot(event_df, log_df, fig_pickle_path, rate_aggregation_str):
"""Plot and save the event id and the R0->DL1 processing rates wrt to time for an entire night
Paramters
---------
event_df : pd.DataFrame
DataFrame with events data.
log_df : pd.DataFrame
DataFrame with log files data (processing rate)
fig_pickle_path : pathlib.Path
Path to where the plot figure should be pickled.
rate_aggregation_str : str
Aggregation string in pandas time grouper syntax. For example for 10 seconds: "10S"
"""
sns.set_theme()
worker_nodes = sorted(log_df["worker_node"].unique().tolist())
run_ids = sorted(log_df["run_id"].unique())
run_starts = [log_df.loc[log_df["run_id"] == run_id, "timestamp"].min() for run_id in run_ids]
fig, ax = plt.subplots(
2,
1,
figsize=(15, 10),
sharex=True,
)
sns.scatterplot(
x="trigger_time",
y="event_id_diffs",
hue="worker_node",
style="worker_node",
data=event_df[
(event_df["event_id_diffs"] != 4) # don't plot events with normal event id diffs (too many of them)
& (
event_df["event_id_diffs"] != 0
) # those are: 1st events in subrun line file, or events after future events.
& (event_df["trigger_time"] < pd.to_datetime("21000101")) # don't plot events from the future
& (event_df["event_id_diffs"] < 1e18) # huge event ids when lines caught events of start of next run
],
ax=ax[0],
s=12,
style_order=worker_nodes,
hue_order=worker_nodes,
)
ax[0].set_yscale("log")
sns.lineplot(
x="timestamp",
y="nbEvent",
data=log_df.groupby([pd.Grouper(key="timestamp", freq=rate_aggregation_str), "worker_node"]).mean(),
hue="worker_node",
style="worker_node",
units="run_id",
markers=True,
dashes=False,
markersize=4,
linewidth=1,
sort=False,
ax=ax[1],
hue_order=worker_nodes,
markeredgewidth=0.0,
)
for run_id, run_start in zip(run_ids, run_starts):
ax[1].annotate(
str(run_id),
(run_start, 0),
xytext=(0, 2),
textcoords="offset fontsize",
arrowprops=dict(arrowstyle="-|>", color="k"),
fontsize="xx-small",
)
fig.tight_layout()
with open(fig_pickle_path, "wb") as plot_fig_pickle:
pickle.dump(fig, plot_fig_pickle)
[docs]
def process_night(
night,
nb_process,
force_reload,
data_base_path,
obs_relative_dl1_folder,
obs_relative_data_folder,
relative_logs_folder,
output_file_prefix,
rate_aggregation_str,
nb_retries,
re_try_wait_time,
):
"""Parses DL1 and log files of an entire night to plot the event ids and processing rates wrt time
Parameters
----------
night : str
Night data in format "YYYYMMDD"
nb_process : int
Number of process to use to parse the files
force_reload : bool
If true, the data and logs files are re-processed even if the dataframe is already saved
at the output location.
data_base_path : pathlib.Path
Path to the the folder containing the nights directories
obs_relative_dl1_folder : pathlib.Path
Relative path to the folder containing dl1 files from the night directory
obs_relative_data_folder : pathlib.Path
Relative path to the folder containing the data (dl1, dl2, dl3) subfolder from the night directory.
relative_logs_folder : pathlib.Path
Relative path to the folder containing the night's logs from the night repository
output_file_prefix : str
Prefix to prepend to the DataFrame and Figure to save. The night str will be appended to
make the full output file names.
rate_aggregation_str : str
Aggregation string in pandas time grouper syntax. For example for 10 seconds: "10S"
nb_retries : int
Number of times to re-try reading a DL1 file.
re_try_wait_time : float
Amount of time, in seconds, to wait before re-trying to open a file.
"""
night_dir = data_base_path / night
night_df_path = night_dir / Path(f"{output_file_prefix}{night}.h5")
if night_df_path.exists() and not force_reload:
event_df = pd.read_hdf(night_df_path, key="event_df", mode="r")
log_df = pd.read_hdf(night_df_path, key="log_df", mode="r")
else:
events_dfs = []
logs_dfs = []
for run_dir_data_path in tqdm(find_nigth_run_dirs(night_dir)):
try:
run_id = int(run_dir_data_path.stem)
worker_nodes = parse_run_worker_nodes(run_dir_data_path / obs_relative_data_folder)
events_dfs.extend(
parse_run_dl1_events(
run_dir_data_path / obs_relative_dl1_folder,
run_id,
worker_nodes,
nb_process,
nb_retries,
re_try_wait_time,
)
)
logs_dfs.extend(parse_run_log_files(night_dir / relative_logs_folder, run_id, worker_nodes))
except Exception:
logging.error(f"Can not process run {run_dir_data_path}", exc_info=True)
event_df = pd.concat(events_dfs, ignore_index=True)
log_df = pd.concat(logs_dfs, ignore_index=True)
event_df.to_hdf(night_df_path, key="event_df", mode="w")
log_df.to_hdf(night_df_path, key="log_df", mode="a")
save_line_delays_plot(event_df, log_df, night_dir / f"{output_file_prefix}{night}.pickle", rate_aggregation_str)
[docs]
def main():
log_uncaught_exceptions()
parser = argparse.ArgumentParser(
prog="plot_line_delays.py",
description="Plotting script to visualize delays in events per line. "
"This script will first build two pandas dataframes while loading the night's data, "
"and save it to the night's data folder (unless a df is already saved there). "
"It will then plot the lines unwanted event ids and pickle the figure to the night's data as well. "
"To view the figure: import pickle; figx = pickle.load(open('Figure.pickle', 'rb')); figx.show()",
epilog="Suitable environment for this script: conda create -n visu -c conda-forge pytables pandas seaborn",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("--night", "-n", type=str, required=True, help='Night to plot, in format "YYYYMMDD".')
parser.add_argument(
"--nb_process", "-p", type=int, default=1, help="Number of process to use to aggregate the night's data."
)
parser.add_argument(
"--force_reload",
"-f",
action="store_true",
default=False,
help="Forces the re-parsing of the night's data "
"even if a visualization dataframe is already saved in the night folder.",
)
parser.add_argument(
"--data_base_path",
"-d",
type=str,
default="/fefs/onsite/pipeline/rta/data/",
help="Path to the folder containing the reco nights outputs.",
)
parser.add_argument(
"--obs_relative_dl1_folder",
"-fdl1",
type=str,
default="reco/dl1/",
help="Path to the folder containing the DL1 hdf5 files, relative to the observation folder.",
)
parser.add_argument(
"--obs_relative_data_folder",
"-fdata",
type=str,
default="reco/",
help="Path to the top level data folder for an observation, relative to the observation folder. "
"It is used to find the hiperta_stream_start log of the slurm jobs.",
)
parser.add_argument(
"--relative_logs_folder",
"-flogs",
type=str,
default="logs/RECO/r0_dl1",
help="Path to the folder containing the r0->dl1 logs, relative to the night folder."
"(All observations logs of a night are stored together in LST.)",
)
parser.add_argument(
"--output_file_prefix",
"-o",
type=str,
default="visu_delay_",
help="Prefix string to prepend to the dataframe and figure file. "
"To this prefix will be appended the night string.",
)
parser.add_argument(
"--rate_aggregation_str",
"-a",
type=str,
default="1S",
help="Time range to use to aggregate rate values in plot, in pandas grouper syntax. "
"For instance for 10 seconds: '10S'. No grouping is '1S'. This can be ",
)
parser.add_argument(
"--nb_retries",
"-r",
type=int,
default=3,
help="Number of time to re-try opening a file (in case it fails due to fefs).",
)
parser.add_argument(
"--re_try_wait_time",
"-t",
type=float,
default=1.0,
help="Amount of time to wait before re-trying to open a data file.",
)
parser.add_argument(
"--log_file",
"-l",
type=str,
help="Path to the file to write this scipt logs. If not provided, logs are printed to the console.",
)
args = parser.parse_args()
if args.log_file is not None:
logging.basicConfig(filename=args.log_file)
data_base_path = Path(args.data_base_path)
obs_relative_dl1_folder = Path(args.obs_relative_dl1_folder)
obs_relative_data_folder = Path(args.obs_relative_data_folder)
relative_logs_folder = Path(args.relative_logs_folder)
nb_process = args.nb_process if args.nb_process >= 1 else 1
process_night(
night=args.night,
nb_process=nb_process,
force_reload=args.force_reload,
data_base_path=data_base_path,
obs_relative_dl1_folder=obs_relative_dl1_folder,
obs_relative_data_folder=obs_relative_data_folder,
relative_logs_folder=relative_logs_folder,
output_file_prefix=args.output_file_prefix,
rate_aggregation_str=args.rate_aggregation_str,
nb_retries=args.nb_retries,
re_try_wait_time=args.re_try_wait_time,
)
if __name__ == "__main__":
main()