Source code for lst_auto_rta.hdf5_data_check

import datetime
import time
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from pathlib import Path
from threading import Lock
from typing import Annotated, Dict, List, NamedTuple

import matplotlib.dates
import numpy as np
import pandas as pd
import seaborn as sns
from annotated_types import Ge, Gt
from matplotlib import pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
from pandas.api.types import is_datetime64_any_dtype
from scipy import ndimage
from tqdm import tqdm

from lst_auto_rta.config.hdf5_data_check import HDFDataCheckConfiguration
from lst_auto_rta.utils.hdf5 import pd_read_hdf_with_retry


[docs] class FileProvenance(NamedTuple): obs_id: Annotated[int, Ge(0)] line_idx: Annotated[int, Ge(0)] worker_node: str
# def parse_file_provenance(cta_data_file_path: str, process_idx_to_nodelist: Dict[int, str]) -> FileProvenance: # # get the string after "..._id_", and split on "_" to get only this part of the file path. # obs_id = int(cta_data_file_path.split("obs_id_", 1)[1].split("_", 1)[0]) # line_idx = int(cta_data_file_path.split("line_idx_", 1)[1].split("_", 1)[0]) # return FileProvenance(obs_id=obs_id, line_idx=line_idx, worker_node=process_idx_to_nodelist[line_idx])
[docs] def parse_file_provenance(cta_data_file_path: Path, process_idx_to_nodelist: Dict[int, str]) -> FileProvenance: # dl1_v06_${obs_id}_${run_id}_${tel_id}_${processIndex}_${threadIndex}_${fileIndex}.h5 split_ = cta_data_file_path.stem.split("_") obs_id = int(split_[3]) line_idx = int(split_[5]) return FileProvenance(obs_id=obs_id, line_idx=line_idx, worker_node=process_idx_to_nodelist[line_idx])
[docs] def add_debuging_fields_dl1_df( dl1_df: pd.DataFrame, obs_id: int, line_idx: int, worker_node: str, worker_node_categories: List[str], start=None, end=None, step=None, ) -> None: # Some events have a trigger time "from the future" that mess with event_id_diff # so set the event id diff of all of these future events and the following event as well to 0 # prepend 1st value to get diff = 0 for 1st elem event_id_diffs = np.diff(dl1_df["event_id"], prepend=dl1_df["event_id"][0]) event_id_diffs[ ndimage.binary_dilation(dl1_df["trigger_time"] > pd.to_datetime("21000101"), np.array([0, 1, 1])) ] = 0 dl1_df.loc[slice(start, end, step), "event_id_diff"] = event_id_diffs dl1_df.loc[slice(start, end, step), "obs_id"] = obs_id dl1_df.loc[slice(start, end, step), "line_idx"] = line_idx dl1_df.loc[slice(start, end, step), "worker_node"] = pd.Categorical( [worker_node] * dl1_df.shape[0], categories=worker_node_categories )
[docs] def timestamp_to_datetime(df: pd.DataFrame, columns: List[str]): for col in columns: df[col] = pd.to_datetime(df[col], unit="s")
[docs] class HDFDataCheck:
[docs] def __init__( self, data_check_config: HDFDataCheckConfiguration, process_idx_to_nodelist: Dict[int, str], ): self.conf = data_check_config self.last_plot_datetime = datetime.datetime.now() self.plot_refresh_interval_timedelta = datetime.timedelta(seconds=data_check_config.plot_refresh_interval_s) self.process_idx_to_nodelist = process_idx_to_nodelist self.data_lock = Lock() self.hdf_data = None self.current_length = 0
def init_db(self, dtypes: pd.Series): # Allocate the big df that will accumulate all the data to plot. # size should be more than enough to store an entire run data # we first create an empty df with the right length # then we create the columns with a default value, according to the dtype # (pandas automatically fills missing values with NaNs, which changes the type of the column to float # if we were to try to create the df with only the dtype and an example data.) self.hdf_data = pd.DataFrame(index=np.arange(self.conf.size_storage_dataframe)) for col in dtypes.index: col_dtype = dtypes[col] if isinstance(col_dtype, pd.CategoricalDtype): self.hdf_data[col] = pd.Categorical( [col_dtype.categories[0]] * self.hdf_data.shape[0], categories=col_dtype.categories ) elif np.issubdtype(col_dtype, np.datetime64): self.hdf_data[col] = np.datetime64(0, "ns") elif np.issubdtype(col_dtype, bool): self.hdf_data[col] = False elif np.issubdtype(col_dtype, np.integer): self.hdf_data[col] = col_dtype.type(np.iinfo(col_dtype).max) elif np.issubdtype(col_dtype, np.inexact): self.hdf_data[col] = col_dtype.type(np.nan) else: raise ValueError( "Can not set default value of column {} with dtype {}! Unsupported dtype!".format(col, col_dtype) ) def read_hdf5_data(self, hdf5_file_path: Path): file_provenance = parse_file_provenance(hdf5_file_path, self.process_idx_to_nodelist) # Read the hdf5 data directly in pandas DataFrame # Alternatively, we could read with pytables and then copy the columns in the big df with table.col("col_name") # but moving to pandas straight away allows to perform all operations on df file_df = pd_read_hdf_with_retry( hdf5_file_path, key=self.conf.data_path_in_hdf5, mode="r", nb_tries=self.conf.hdf5_open_nb_retries, retry_wait_time_s=self.conf.hdf5_open_wait_time_s, retry_on_os_error=True, ) if self.conf.column_to_datetime is not None: timestamp_to_datetime(file_df, self.conf.column_to_datetime) add_debuging_fields_dl1_df( file_df, file_provenance.obs_id, file_provenance.line_idx, file_provenance.worker_node, list(self.process_idx_to_nodelist.values()), ) file_nb_events = file_df.shape[0] with self.data_lock: # If the big df is not initialized: do it now that we know the dtype if self.hdf_data is None: self.init_db(file_df.dtypes) # copy the file df in the big df and update length. Do not use loc but iloc !! self.hdf_data.iloc[self.current_length : self.current_length + file_nb_events] = file_df self.current_length += file_nb_events def data_check_new_file(self, hdf5_file_path: Path, png_dir_path: Path, plots_pdf_path: Path): self.read_hdf5_data(hdf5_file_path) if datetime.datetime.now() - self.last_plot_datetime > self.plot_refresh_interval_timedelta: self.last_plot_datetime = datetime.datetime.now() with self.data_lock: self.plot(png_dir_path, plots_pdf_path) def plot(self, png_dir_path: Path, plots_pdf_path: Path): sns.set_theme() png_dir_path.mkdir(exist_ok=True) # legend order legend_order = sorted(self.hdf_data.loc[:, "worker_node"].dtype.categories) # masks mask_event_quality = self.hdf_data["event_quality"] == 0 mask_is_good_event = self.hdf_data["is_good_event"] == 1 mask_future_events = self.hdf_data["trigger_time"] <= pd.to_datetime("21000101") # TODO: # - repair histogram plots with numpy_x fcts or numpy_y # - apply masks on plots that have e31 values ... # - find out why seaborn is so slow (do only histograms ?) # - implement tests with PdfPages(plots_pdf_path) as pdf_pages: for plt_png_path, plt_conf in self.conf.plots_config.items(): fig, ax = plt.subplots(1, 1, figsize=(12, 8)) # select data based on masks and current length fig_mask = np.ones((self.hdf_data.shape[0],), dtype=bool) fig_mask[self.current_length :] = 0 # do not select anything over current length if plt_conf.mask_event_quality: fig_mask &= mask_event_quality if plt_conf.mask_future_events: fig_mask &= mask_future_events if plt_conf.mask_is_good_event: fig_mask &= mask_is_good_event data = self.hdf_data.loc[fig_mask, :] # Apply groupby operation if requested if plt_conf.groupby is not None: data = getattr( data.groupby( [ item if isinstance(item, str) else pd.Grouper(**item) for item in plt_conf.groupby.groupby ] ), plt_conf.groupby.computation[0], **(plt_conf.groupby.kwargs if plt_conf.groupby.kwargs is not None else {}), )(**(plt_conf.groupby.computation[1] if plt_conf.groupby.computation[1] is not None else {})) # Apply function on x or y data if requested for field, fct_conf in zip(["x", "y"], [plt_conf.np_function_x, plt_conf.np_function_y]): if fct_conf is not None: try: plt_conf.kwargs[field] = getattr(np, fct_conf[0])( data.loc[:, plt_conf.kwargs[field]], **(fct_conf[1] if fct_conf[1] is not None else {}), ) except AttributeError as e: raise ValueError( "Could not apply function {} with kwargs {} on {}".format( fct_conf[0], fct_conf[1], plt_conf.kwargs[field] ) ) from e if plt_conf.kind == "plot": plot_fct = sns.lineplot # do not sort unless user requested it (lineplot sorted is quite confusing) plt_conf.kwargs["sort"] = plt_conf.kwargs.get("sort", False) plt_conf.kwargs["style"] = plt_conf.kwargs.get("style", "worker_node") plt_conf.kwargs["style_order"] = plt_conf.kwargs.get("style_order", legend_order) elif plt_conf.kind == "hist": plot_fct = sns.histplot plt_conf.kwargs["multiple"] = plt_conf.kwargs.get("multiple", "dodge") # plt_conf.kwargs["align"] = plt_conf.kwargs.get("align", "center") # not supported by seaborn which already supplies the 'align' arg plt_conf.kwargs["shrink"] = plt_conf.kwargs.get("shrink", 0.8) else: raise ValueError("Plot kind {} not supported".format(plt_conf.kind)) # todo: fix histo intensity # fix xticks time: show second # fix histogram column placement plot_fct( **plt_conf.kwargs, data=data, hue="worker_node", hue_order=legend_order, ax=ax, ) if plt_conf.title is not None: ax.set_title(plt_conf.title) if plt_conf.xlabel is not None: ax.set_xlabel(plt_conf.xlabel) if plt_conf.ylabel is not None: ax.set_ylabel(plt_conf.ylabel) if len(ax.get_xticklabels()[0].get_text()) >= 6: ax.set_xticks(ax.get_xticks(), ax.get_xticklabels(), rotation=45, ha="right") fig.tight_layout() fig.savefig(png_dir_path / plt_png_path, bbox_inches="tight") pdf_pages.savefig(fig) plt.close(fig) def write_df_and_reset_data(self, data_dir: Path, df_write_path: Path): self.hdf_data.loc[: self.current_length].to_hdf(df_write_path, key=self.conf.write_df_store_key, mode="w") self.current_length = 0
[docs] def main(): parser = ArgumentParser( description="Data check plotting util of LST AUTO RTA: " "run this entrypoint to reproduce offline the plots generated during the night", formatter_class=ArgumentDefaultsHelpFormatter, ) parser.add_argument( "-c", "--data_check_config", type=str, required=True, dest="config", help="Path to the data check configuration file", ) parser.add_argument( "-d", "--data_dir", type=str, required=True, dest="data_dir", help="Path to the directory containing the data files, to run the data check on", ) parser.add_argument( "-n", "--night_nodes", type=str, nargs="+", dest="nodes", help="List of worker nodes used during the night. Eg: cp12 cp15 cp42 cp58", ) parser.add_argument( "-p", "--png_dir_path", type=str, required=True, dest="png_dir_path", help="Path to the directory where to write the plots as png images.", ) parser.add_argument( "-f", "--pdf_path", type=str, required=True, dest="pdf_path", help="Path to the output pdf with the plots." ) args = parser.parse_args() start_time = time.process_time() with open(Path(args.config), "r") as config_f: config = HDFDataCheckConfiguration.model_validate_json(config_f.read()) config_loaded_time = time.process_time() print("Configuration loading time: {:.2f}s".format(config_loaded_time - start_time)) data_checker = HDFDataCheck(config, {i: n for i, n in enumerate(args.nodes)}) instantiation_time = time.process_time() print("Instantiation time: {:.2f}s".format(instantiation_time - config_loaded_time)) file_list = [ p for p in Path(args.data_dir).glob("*") if p.is_file() and p.suffix == ".h5" and p.stem.startswith("dl1_v06") ] data_checker.read_hdf5_data(file_list[0]) init_and_load_first_file_time = time.process_time() print("Big df init and load 1st file time: {:.2f}s".format(init_and_load_first_file_time - instantiation_time)) for data_file in tqdm(file_list[1:]): data_checker.read_hdf5_data(data_file) load_all_files_time = time.process_time() print("Load remaining files: {:.2f}s".format(load_all_files_time - init_and_load_first_file_time)) data_checker.plot(Path(args.png_dir_path), Path(args.pdf_path)) plot_time = time.process_time() print("Plot time: {:.2f}s".format(plot_time - load_all_files_time))
if __name__ == "__main__": main()