Coverage for lst_auto_rta / crab_cleanup.py: 0%
167 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-16 15:10 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-16 15:10 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
4"""
5Scan DL1 HDF5 files, estimate telescope pointing vs Crab in AltAz, and decide KEEP/DELETE per obs_id.
6Also collects associated logs and exports deletion/keep lists + CSV summary.
8Log file is timestamped to avoid overwriting between runs.
10Example : "python3 crab_cleanup.py --base-day-dir /fefs/onsite/pipeline/rta/data/2025/12/ --out-dir ./crab_cleanup_out --max-sep-deg 5.0 --tel-group tel_001 --step 200"
11"""
13import argparse
14import logging
15import re
16from pathlib import Path
17from datetime import datetime, timezone
19import numpy as np
20import pandas as pd
21import h5py
23from astropy.time import Time
24from astropy import units as u
25from astropy.coordinates import SkyCoord, EarthLocation, AltAz
28# -------------------------
29# Patterns / helpers
30# -------------------------
32DL1_RE = re.compile(r"^(dl1_|dl1_v).+\.h5$")
33OBS_RE = re.compile(r"obs_id_(\d+)")
35def is_dl1_file(p: Path) -> bool:
36 return p.is_file() and DL1_RE.match(p.name) is not None
38def extract_obs_id_from_name(filename: str):
39 m = OBS_RE.search(filename)
40 return int(m.group(1)) if m else None
42def scan_dl1_files(day_dir: Path):
43 """
44 Recursively scan all DL1 *.h5 under day_dir.
45 Returns dict: obs_id -> list[Path]
46 """
47 by_obs = {}
48 for p in day_dir.rglob("*.h5"):
49 if not is_dl1_file(p):
50 continue
51 obs = extract_obs_id_from_name(p.name)
52 if obs is None:
53 continue
54 by_obs.setdefault(obs, []).append(p)
56 for obs in by_obs:
57 by_obs[obs] = sorted(by_obs[obs])
58 return by_obs
61# -------------------------
62# DL1 reading + separation
63# -------------------------
65def open_dl1_get_pointing_and_time(dl1_path: Path, tel_group: str, step: int):
66 """
67 Extract a subsample of (time, alt_tel, az_tel) from DL1.
69 Assumptions:
70 - dl1/event/subarray/trigger/time = unix seconds (UTC)
71 - dl1/event/telescope/parameters/<tel_group>/alt_tel, az_tel = radians
72 """
73 with h5py.File(dl1_path, "r") as f:
74 trig = f["dl1/event/subarray/trigger"]
75 pars = f[f"dl1/event/telescope/parameters/{tel_group}"]
77 n = len(trig)
78 if n == 0:
79 return np.array([]), np.array([]), np.array([])
81 idx = np.arange(0, n, step, dtype=int)
83 times_unix = trig["time"][idx].astype(float)
84 alt_rad = pars["alt_tel"][idx].astype(float)
85 az_rad = pars["az_tel"][idx].astype(float)
87 return times_unix, alt_rad, az_rad
89def crab_separation_stats(dl1_path: Path,
90 site: EarthLocation,
91 crab: SkyCoord,
92 tel_group: str,
93 step: int,
94 max_sep_deg: float):
95 """
96 Compute separation stats (deg) between telescope pointing and Crab in AltAz.
97 Default decision: median separation < max_sep_deg -> KEEP else DELETE.
98 """
99 times_unix, alt_tel, az_tel = open_dl1_get_pointing_and_time(dl1_path, tel_group=tel_group, step=step)
101 if len(times_unix) == 0:
102 return {
103 "sep_min_deg": np.nan, "sep_med_deg": np.nan, "sep_mean_deg": np.nan, "sep_max_deg": np.nan,
104 "decision": "ERROR", "error": "no events"
105 }
107 t = Time(times_unix, format="unix", scale="utc")
109 crab_altaz = crab.transform_to(AltAz(obstime=t, location=site))
110 tel_altaz = SkyCoord(az=az_tel * u.rad, alt=alt_tel * u.rad,
111 frame=AltAz(obstime=t, location=site))
113 sep_deg = tel_altaz.separation(crab_altaz).to_value(u.deg)
115 stats = {
116 "sep_min_deg": float(np.min(sep_deg)),
117 "sep_med_deg": float(np.median(sep_deg)),
118 "sep_mean_deg": float(np.mean(sep_deg)),
119 "sep_max_deg": float(np.max(sep_deg)),
120 "error": ""
121 }
123 keep = stats["sep_med_deg"] < max_sep_deg
124 stats["decision"] = "KEEP" if keep else "DELETE"
125 return stats
128# -------------------------
129# Logs discovery
130# -------------------------
132def find_obs_root_from_dl1(dl1_path: Path, obs_id: int) -> Path | None:
133 """
134 Try to locate the directory path that corresponds to ".../<obs_id>/" within dl1_path.
135 """
136 parts = dl1_path.parts
137 obs_str = str(obs_id)
138 if obs_str not in parts:
139 return None
140 i = parts.index(obs_str)
141 return Path(*parts[:i + 1])
143def find_logs_for_dl1(dl1_path: Path, log_subdir: Path):
144 """
145 Look for logs in <obs_root>/<log_subdir>/ matching:
146 <dl1_prefix>.o-* and <dl1_prefix>.e-*
147 """
148 obs = extract_obs_id_from_name(dl1_path.name)
149 if obs is None:
150 return []
152 obs_root = find_obs_root_from_dl1(dl1_path, obs)
153 if obs_root is None:
154 return []
156 log_dir = obs_root / log_subdir
157 if not log_dir.exists():
158 return []
160 prefix = dl1_path.stem
161 logs = []
162 logs.extend(sorted(log_dir.glob(f"{prefix}.o-*")))
163 logs.extend(sorted(log_dir.glob(f"{prefix}.e-*")))
164 return [p for p in logs]
167# -------------------------
168# Main
169# -------------------------
171def setup_logger(out_dir: Path, prefix: str = "crab_pointing_cleanup"):
172 out_dir.mkdir(parents=True, exist_ok=True)
174 # Timestamp (UTC) in filename to avoid overwriting
175 ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S_UTC")
176 log_path = out_dir / f"{prefix}_{ts}.log"
178 logger = logging.getLogger("crab_cleanup")
179 logger.setLevel(logging.INFO)
180 logger.handlers.clear()
182 fmt = logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")
184 fh = logging.FileHandler(log_path)
185 fh.setLevel(logging.INFO)
186 fh.setFormatter(fmt)
188 sh = logging.StreamHandler()
189 sh.setLevel(logging.INFO)
190 sh.setFormatter(fmt)
192 logger.addHandler(fh)
193 logger.addHandler(sh)
195 logger.info("Log file: %s", log_path)
196 logger.info("Script start UTC: %s", datetime.now(timezone.utc).isoformat())
197 return logger, log_path
199def main():
200 parser = argparse.ArgumentParser(description="KEEP/DELETE DL1 based on telescope pointing separation to Crab.")
201 parser.add_argument("--base-day-dir", type=Path, required=True,
202 help="Day directory to scan (e.g. /.../2026/01/20 or /.../2026/01/)")
203 parser.add_argument("--max-sep-deg", type=float, default=5.0,
204 help="Max median separation (deg) to KEEP (default: 5.0)")
205 parser.add_argument("--tel-group", type=str, default="tel_001",
206 help="HDF5 telescope group name (default: tel_001)")
207 parser.add_argument("--step", type=int, default=200,
208 help="Subsampling step (default: 200)")
209 parser.add_argument("--log-subdir", type=Path, default=Path("logs/dl1_alt_az"),
210 help="Logs path relative to obs_id directory (default: logs/dl1_alt_az)")
211 parser.add_argument("--out-dir", type=Path, default=Path("."),
212 help="Output directory for txt/csv/log files (default: current directory)")
213 parser.add_argument("--orm-height-m", type=float, default=2200.0,
214 help="Site height in meters (default: 2200)")
215 args = parser.parse_args()
217 logger, _ = setup_logger(args.out_dir)
219 base_day_dir = args.base_day_dir
220 if not base_day_dir.exists():
221 raise FileNotFoundError(f"base-day-dir does not exist: {base_day_dir}")
223 # ORM / La Palma (LST-1)
224 site = EarthLocation(lat=28.7617 * u.deg, lon=-17.89 * u.deg, height=args.orm_height_m * u.m)
226 # Crab (ICRS J2000)
227 crab = SkyCoord(ra=83.6331 * u.deg, dec=22.0145 * u.deg, frame="icrs")
229 logger.info("Scanning DL1 under: %s", base_day_dir)
230 dl1_by_obs = scan_dl1_files(base_day_dir)
231 logger.info("Unique obs_id: %d", len(dl1_by_obs))
232 logger.info("Example obs_id: %s", list(dl1_by_obs.keys())[:5])
234 rows = []
235 for obs_id, files in sorted(dl1_by_obs.items()):
236 rep = files[0]
237 try:
238 stats = crab_separation_stats(rep, site=site, crab=crab,
239 tel_group=args.tel_group,
240 step=args.step,
241 max_sep_deg=args.max_sep_deg)
242 row = {"obs_id": obs_id, "rep_file": str(rep), **stats}
243 rows.append(row)
244 logger.info("obs_id=%s rep=%s decision=%s sep_med=%.3f deg",
245 obs_id, rep.name, row["decision"],
246 row["sep_med_deg"] if np.isfinite(row["sep_med_deg"]) else float("nan"))
247 except Exception as e:
248 rows.append({
249 "obs_id": obs_id,
250 "rep_file": str(rep),
251 "sep_min_deg": np.nan, "sep_med_deg": np.nan, "sep_mean_deg": np.nan, "sep_max_deg": np.nan,
252 "decision": "ERROR",
253 "error": repr(e)
254 })
255 logger.exception("obs_id=%s rep=%s ERROR: %r", obs_id, rep.name, e)
257 df = pd.DataFrame(rows).sort_values(["decision", "obs_id"])
258 logger.info("Decision counts:\n%s", df["decision"].value_counts(dropna=False).to_string())
260 decision_by_obs = {int(r.obs_id): r.decision for _, r in df.iterrows()}
262 to_delete_dl1 = []
263 to_delete_logs = []
264 to_keep_dl1 = []
266 for obs_id, files in dl1_by_obs.items():
267 dec = decision_by_obs.get(obs_id, "ERROR")
268 if dec == "KEEP":
269 to_keep_dl1.extend(files)
270 elif dec == "DELETE":
271 to_delete_dl1.extend(files)
272 for f in files:
273 to_delete_logs.extend(find_logs_for_dl1(f, log_subdir=args.log_subdir))
275 to_delete_dl1 = sorted(set(to_delete_dl1))
276 to_delete_logs = sorted(set(to_delete_logs))
277 to_keep_dl1 = sorted(set(to_keep_dl1))
279 logger.info("DL1 to delete: %d", len(to_delete_dl1))
280 logger.info("Logs to delete: %d", len(to_delete_logs))
281 logger.info("DL1 to keep: %d", len(to_keep_dl1))
283 # Outputs
284 out_dir = args.out_dir
285 out_del_dl1 = out_dir / "to_delete_dl1.txt"
286 out_del_logs = out_dir / "to_delete_logs.txt"
287 out_keep = out_dir / "to_keep_dl1.txt"
288 out_csv = out_dir / "obsid_crab_pointing_summary.csv"
290 out_del_dl1.write_text("\n".join(str(p) for p in to_delete_dl1) + ("\n" if to_delete_dl1 else ""))
291 out_del_logs.write_text("\n".join(str(p) for p in to_delete_logs) + ("\n" if to_delete_logs else ""))
292 out_keep.write_text("\n".join(str(p) for p in to_keep_dl1) + ("\n" if to_keep_dl1 else ""))
293 df.to_csv(out_csv, index=False)
295 logger.info("Wrote: %s", out_del_dl1)
296 logger.info("Wrote: %s", out_del_logs)
297 logger.info("Wrote: %s", out_keep)
298 logger.info("Wrote: %s", out_csv)
300 keep_obs = df.loc[df["decision"] == "KEEP", "obs_id"].to_list()
301 logger.info("KEEP obs_id list: %s", keep_obs)
303 # Shell helpers
304 logger.info("### Dry-run command:")
305 logger.info('cat "%s" "%s" | while read f; do echo rm -f "$f"; done', out_del_dl1, out_del_logs)
307 logger.info("### Delete command:")
308 logger.info('cat "%s" "%s" | while read f; do rm -f "$f"; done', out_del_dl1, out_del_logs)
311if __name__ == "__main__":
312 main()