import datetime
import time
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from pathlib import Path
from threading import Lock
from typing import Annotated, Dict, List, NamedTuple
import matplotlib.dates
import numpy as np
import pandas as pd
import seaborn as sns
from annotated_types import Ge, Gt
from matplotlib import pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
from pandas.api.types import is_datetime64_any_dtype
from scipy import ndimage
from tqdm import tqdm
from lst_auto_rta.config.hdf5_data_check import HDFDataCheckConfiguration
from lst_auto_rta.utils.hdf5 import pd_read_hdf_with_retry
[docs]
class FileProvenance(NamedTuple):
obs_id: Annotated[int, Ge(0)]
line_idx: Annotated[int, Ge(0)]
worker_node: str
# def parse_file_provenance(cta_data_file_path: str, process_idx_to_nodelist: Dict[int, str]) -> FileProvenance:
# # get the string after "..._id_", and split on "_" to get only this part of the file path.
# obs_id = int(cta_data_file_path.split("obs_id_", 1)[1].split("_", 1)[0])
# line_idx = int(cta_data_file_path.split("line_idx_", 1)[1].split("_", 1)[0])
# return FileProvenance(obs_id=obs_id, line_idx=line_idx, worker_node=process_idx_to_nodelist[line_idx])
[docs]
def parse_file_provenance(cta_data_file_path: Path, process_idx_to_nodelist: Dict[int, str]) -> FileProvenance:
# dl1_v06_${obs_id}_${run_id}_${tel_id}_${processIndex}_${threadIndex}_${fileIndex}.h5
split_ = cta_data_file_path.stem.split("_")
obs_id = int(split_[3])
line_idx = int(split_[5])
return FileProvenance(obs_id=obs_id, line_idx=line_idx, worker_node=process_idx_to_nodelist[line_idx])
[docs]
def add_debuging_fields_dl1_df(
dl1_df: pd.DataFrame,
obs_id: int,
line_idx: int,
worker_node: str,
worker_node_categories: List[str],
start=None,
end=None,
step=None,
) -> None:
# Some events have a trigger time "from the future" that mess with event_id_diff
# so set the event id diff of all of these future events and the following event as well to 0
# prepend 1st value to get diff = 0 for 1st elem
event_id_diffs = np.diff(dl1_df["event_id"], prepend=dl1_df["event_id"][0])
event_id_diffs[
ndimage.binary_dilation(dl1_df["trigger_time"] > pd.to_datetime("21000101"), np.array([0, 1, 1]))
] = 0
dl1_df.loc[slice(start, end, step), "event_id_diff"] = event_id_diffs
dl1_df.loc[slice(start, end, step), "obs_id"] = obs_id
dl1_df.loc[slice(start, end, step), "line_idx"] = line_idx
dl1_df.loc[slice(start, end, step), "worker_node"] = pd.Categorical(
[worker_node] * dl1_df.shape[0], categories=worker_node_categories
)
[docs]
def timestamp_to_datetime(df: pd.DataFrame, columns: List[str]):
for col in columns:
df[col] = pd.to_datetime(df[col], unit="s")
[docs]
class HDFDataCheck:
[docs]
def __init__(
self,
data_check_config: HDFDataCheckConfiguration,
process_idx_to_nodelist: Dict[int, str],
):
self.conf = data_check_config
self.last_plot_datetime = datetime.datetime.now()
self.plot_refresh_interval_timedelta = datetime.timedelta(seconds=data_check_config.plot_refresh_interval_s)
self.process_idx_to_nodelist = process_idx_to_nodelist
self.data_lock = Lock()
self.hdf_data = None
self.current_length = 0
def init_db(self, dtypes: pd.Series):
# Allocate the big df that will accumulate all the data to plot.
# size should be more than enough to store an entire run data
# we first create an empty df with the right length
# then we create the columns with a default value, according to the dtype
# (pandas automatically fills missing values with NaNs, which changes the type of the column to float
# if we were to try to create the df with only the dtype and an example data.)
self.hdf_data = pd.DataFrame(index=np.arange(self.conf.size_storage_dataframe))
for col in dtypes.index:
col_dtype = dtypes[col]
if isinstance(col_dtype, pd.CategoricalDtype):
self.hdf_data[col] = pd.Categorical(
[col_dtype.categories[0]] * self.hdf_data.shape[0], categories=col_dtype.categories
)
elif np.issubdtype(col_dtype, np.datetime64):
self.hdf_data[col] = np.datetime64(0, "ns")
elif np.issubdtype(col_dtype, bool):
self.hdf_data[col] = False
elif np.issubdtype(col_dtype, np.integer):
self.hdf_data[col] = col_dtype.type(np.iinfo(col_dtype).max)
elif np.issubdtype(col_dtype, np.inexact):
self.hdf_data[col] = col_dtype.type(np.nan)
else:
raise ValueError(
"Can not set default value of column {} with dtype {}! Unsupported dtype!".format(col, col_dtype)
)
def read_hdf5_data(self, hdf5_file_path: Path):
file_provenance = parse_file_provenance(hdf5_file_path, self.process_idx_to_nodelist)
# Read the hdf5 data directly in pandas DataFrame
# Alternatively, we could read with pytables and then copy the columns in the big df with table.col("col_name")
# but moving to pandas straight away allows to perform all operations on df
file_df = pd_read_hdf_with_retry(
hdf5_file_path,
key=self.conf.data_path_in_hdf5,
mode="r",
nb_tries=self.conf.hdf5_open_nb_retries,
retry_wait_time_s=self.conf.hdf5_open_wait_time_s,
retry_on_os_error=True,
)
if self.conf.column_to_datetime is not None:
timestamp_to_datetime(file_df, self.conf.column_to_datetime)
add_debuging_fields_dl1_df(
file_df,
file_provenance.obs_id,
file_provenance.line_idx,
file_provenance.worker_node,
list(self.process_idx_to_nodelist.values()),
)
file_nb_events = file_df.shape[0]
with self.data_lock:
# If the big df is not initialized: do it now that we know the dtype
if self.hdf_data is None:
self.init_db(file_df.dtypes)
# copy the file df in the big df and update length. Do not use loc but iloc !!
self.hdf_data.iloc[self.current_length : self.current_length + file_nb_events] = file_df
self.current_length += file_nb_events
def data_check_new_file(self, hdf5_file_path: Path, png_dir_path: Path, plots_pdf_path: Path):
self.read_hdf5_data(hdf5_file_path)
if datetime.datetime.now() - self.last_plot_datetime > self.plot_refresh_interval_timedelta:
self.last_plot_datetime = datetime.datetime.now()
with self.data_lock:
self.plot(png_dir_path, plots_pdf_path)
def plot(self, png_dir_path: Path, plots_pdf_path: Path):
sns.set_theme()
png_dir_path.mkdir(exist_ok=True)
# legend order
legend_order = sorted(self.hdf_data.loc[:, "worker_node"].dtype.categories)
# masks
mask_event_quality = self.hdf_data["event_quality"] == 0
mask_is_good_event = self.hdf_data["is_good_event"] == 1
mask_future_events = self.hdf_data["trigger_time"] <= pd.to_datetime("21000101")
# TODO:
# - repair histogram plots with numpy_x fcts or numpy_y
# - apply masks on plots that have e31 values ...
# - find out why seaborn is so slow (do only histograms ?)
# - implement tests
with PdfPages(plots_pdf_path) as pdf_pages:
for plt_png_path, plt_conf in self.conf.plots_config.items():
fig, ax = plt.subplots(1, 1, figsize=(12, 8))
# select data based on masks and current length
fig_mask = np.ones((self.hdf_data.shape[0],), dtype=bool)
fig_mask[self.current_length :] = 0 # do not select anything over current length
if plt_conf.mask_event_quality:
fig_mask &= mask_event_quality
if plt_conf.mask_future_events:
fig_mask &= mask_future_events
if plt_conf.mask_is_good_event:
fig_mask &= mask_is_good_event
data = self.hdf_data.loc[fig_mask, :]
# Apply groupby operation if requested
if plt_conf.groupby is not None:
data = getattr(
data.groupby(
[
item if isinstance(item, str) else pd.Grouper(**item)
for item in plt_conf.groupby.groupby
]
),
plt_conf.groupby.computation[0],
**(plt_conf.groupby.kwargs if plt_conf.groupby.kwargs is not None else {}),
)(**(plt_conf.groupby.computation[1] if plt_conf.groupby.computation[1] is not None else {}))
# Apply function on x or y data if requested
for field, fct_conf in zip(["x", "y"], [plt_conf.np_function_x, plt_conf.np_function_y]):
if fct_conf is not None:
try:
plt_conf.kwargs[field] = getattr(np, fct_conf[0])(
data.loc[:, plt_conf.kwargs[field]],
**(fct_conf[1] if fct_conf[1] is not None else {}),
)
except AttributeError as e:
raise ValueError(
"Could not apply function {} with kwargs {} on {}".format(
fct_conf[0], fct_conf[1], plt_conf.kwargs[field]
)
) from e
if plt_conf.kind == "plot":
plot_fct = sns.lineplot
# do not sort unless user requested it (lineplot sorted is quite confusing)
plt_conf.kwargs["sort"] = plt_conf.kwargs.get("sort", False)
plt_conf.kwargs["style"] = plt_conf.kwargs.get("style", "worker_node")
plt_conf.kwargs["style_order"] = plt_conf.kwargs.get("style_order", legend_order)
elif plt_conf.kind == "hist":
plot_fct = sns.histplot
plt_conf.kwargs["multiple"] = plt_conf.kwargs.get("multiple", "dodge")
# plt_conf.kwargs["align"] = plt_conf.kwargs.get("align", "center") # not supported by seaborn which already supplies the 'align' arg
plt_conf.kwargs["shrink"] = plt_conf.kwargs.get("shrink", 0.8)
else:
raise ValueError("Plot kind {} not supported".format(plt_conf.kind))
# todo: fix histo intensity
# fix xticks time: show second
# fix histogram column placement
plot_fct(
**plt_conf.kwargs,
data=data,
hue="worker_node",
hue_order=legend_order,
ax=ax,
)
if plt_conf.title is not None:
ax.set_title(plt_conf.title)
if plt_conf.xlabel is not None:
ax.set_xlabel(plt_conf.xlabel)
if plt_conf.ylabel is not None:
ax.set_ylabel(plt_conf.ylabel)
if len(ax.get_xticklabels()[0].get_text()) >= 6:
ax.set_xticks(ax.get_xticks(), ax.get_xticklabels(), rotation=45, ha="right")
fig.tight_layout()
fig.savefig(png_dir_path / plt_png_path, bbox_inches="tight")
pdf_pages.savefig(fig)
plt.close(fig)
def write_df_and_reset_data(self, data_dir: Path, df_write_path: Path):
self.hdf_data.loc[: self.current_length].to_hdf(df_write_path, key=self.conf.write_df_store_key, mode="w")
self.current_length = 0
[docs]
def main():
parser = ArgumentParser(
description="Data check plotting util of LST AUTO RTA: "
"run this entrypoint to reproduce offline the plots generated during the night",
formatter_class=ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"-c",
"--data_check_config",
type=str,
required=True,
dest="config",
help="Path to the data check configuration file",
)
parser.add_argument(
"-d",
"--data_dir",
type=str,
required=True,
dest="data_dir",
help="Path to the directory containing the data files, to run the data check on",
)
parser.add_argument(
"-n",
"--night_nodes",
type=str,
nargs="+",
dest="nodes",
help="List of worker nodes used during the night. Eg: cp12 cp15 cp42 cp58",
)
parser.add_argument(
"-p",
"--png_dir_path",
type=str,
required=True,
dest="png_dir_path",
help="Path to the directory where to write the plots as png images.",
)
parser.add_argument(
"-f", "--pdf_path", type=str, required=True, dest="pdf_path", help="Path to the output pdf with the plots."
)
args = parser.parse_args()
start_time = time.process_time()
with open(Path(args.config), "r") as config_f:
config = HDFDataCheckConfiguration.model_validate_json(config_f.read())
config_loaded_time = time.process_time()
print("Configuration loading time: {:.2f}s".format(config_loaded_time - start_time))
data_checker = HDFDataCheck(config, {i: n for i, n in enumerate(args.nodes)})
instantiation_time = time.process_time()
print("Instantiation time: {:.2f}s".format(instantiation_time - config_loaded_time))
file_list = [
p for p in Path(args.data_dir).glob("*") if p.is_file() and p.suffix == ".h5" and p.stem.startswith("dl1_v06")
]
data_checker.read_hdf5_data(file_list[0])
init_and_load_first_file_time = time.process_time()
print("Big df init and load 1st file time: {:.2f}s".format(init_and_load_first_file_time - instantiation_time))
for data_file in tqdm(file_list[1:]):
data_checker.read_hdf5_data(data_file)
load_all_files_time = time.process_time()
print("Load remaining files: {:.2f}s".format(load_all_files_time - init_and_load_first_file_time))
data_checker.plot(Path(args.png_dir_path), Path(args.pdf_path))
plot_time = time.process_time()
print("Plot time: {:.2f}s".format(plot_time - load_all_files_time))
if __name__ == "__main__":
main()