Coverage for lst_auto_rta / crab_cleanup.py: 0%

167 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-16 15:10 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3 

4""" 

5Scan DL1 HDF5 files, estimate telescope pointing vs Crab in AltAz, and decide KEEP/DELETE per obs_id. 

6Also collects associated logs and exports deletion/keep lists + CSV summary. 

7 

8Log file is timestamped to avoid overwriting between runs. 

9 

10Example : "python3 crab_cleanup.py --base-day-dir /fefs/onsite/pipeline/rta/data/2025/12/ --out-dir ./crab_cleanup_out --max-sep-deg 5.0 --tel-group tel_001 --step 200" 

11""" 

12 

13import argparse 

14import logging 

15import re 

16from pathlib import Path 

17from datetime import datetime, timezone 

18 

19import numpy as np 

20import pandas as pd 

21import h5py 

22 

23from astropy.time import Time 

24from astropy import units as u 

25from astropy.coordinates import SkyCoord, EarthLocation, AltAz 

26 

27 

28# ------------------------- 

29# Patterns / helpers 

30# ------------------------- 

31 

32DL1_RE = re.compile(r"^(dl1_|dl1_v).+\.h5$") 

33OBS_RE = re.compile(r"obs_id_(\d+)") 

34 

35def is_dl1_file(p: Path) -> bool: 

36 return p.is_file() and DL1_RE.match(p.name) is not None 

37 

38def extract_obs_id_from_name(filename: str): 

39 m = OBS_RE.search(filename) 

40 return int(m.group(1)) if m else None 

41 

42def scan_dl1_files(day_dir: Path): 

43 """ 

44 Recursively scan all DL1 *.h5 under day_dir. 

45 Returns dict: obs_id -> list[Path] 

46 """ 

47 by_obs = {} 

48 for p in day_dir.rglob("*.h5"): 

49 if not is_dl1_file(p): 

50 continue 

51 obs = extract_obs_id_from_name(p.name) 

52 if obs is None: 

53 continue 

54 by_obs.setdefault(obs, []).append(p) 

55 

56 for obs in by_obs: 

57 by_obs[obs] = sorted(by_obs[obs]) 

58 return by_obs 

59 

60 

61# ------------------------- 

62# DL1 reading + separation 

63# ------------------------- 

64 

65def open_dl1_get_pointing_and_time(dl1_path: Path, tel_group: str, step: int): 

66 """ 

67 Extract a subsample of (time, alt_tel, az_tel) from DL1. 

68 

69 Assumptions: 

70 - dl1/event/subarray/trigger/time = unix seconds (UTC) 

71 - dl1/event/telescope/parameters/<tel_group>/alt_tel, az_tel = radians 

72 """ 

73 with h5py.File(dl1_path, "r") as f: 

74 trig = f["dl1/event/subarray/trigger"] 

75 pars = f[f"dl1/event/telescope/parameters/{tel_group}"] 

76 

77 n = len(trig) 

78 if n == 0: 

79 return np.array([]), np.array([]), np.array([]) 

80 

81 idx = np.arange(0, n, step, dtype=int) 

82 

83 times_unix = trig["time"][idx].astype(float) 

84 alt_rad = pars["alt_tel"][idx].astype(float) 

85 az_rad = pars["az_tel"][idx].astype(float) 

86 

87 return times_unix, alt_rad, az_rad 

88 

89def crab_separation_stats(dl1_path: Path, 

90 site: EarthLocation, 

91 crab: SkyCoord, 

92 tel_group: str, 

93 step: int, 

94 max_sep_deg: float): 

95 """ 

96 Compute separation stats (deg) between telescope pointing and Crab in AltAz. 

97 Default decision: median separation < max_sep_deg -> KEEP else DELETE. 

98 """ 

99 times_unix, alt_tel, az_tel = open_dl1_get_pointing_and_time(dl1_path, tel_group=tel_group, step=step) 

100 

101 if len(times_unix) == 0: 

102 return { 

103 "sep_min_deg": np.nan, "sep_med_deg": np.nan, "sep_mean_deg": np.nan, "sep_max_deg": np.nan, 

104 "decision": "ERROR", "error": "no events" 

105 } 

106 

107 t = Time(times_unix, format="unix", scale="utc") 

108 

109 crab_altaz = crab.transform_to(AltAz(obstime=t, location=site)) 

110 tel_altaz = SkyCoord(az=az_tel * u.rad, alt=alt_tel * u.rad, 

111 frame=AltAz(obstime=t, location=site)) 

112 

113 sep_deg = tel_altaz.separation(crab_altaz).to_value(u.deg) 

114 

115 stats = { 

116 "sep_min_deg": float(np.min(sep_deg)), 

117 "sep_med_deg": float(np.median(sep_deg)), 

118 "sep_mean_deg": float(np.mean(sep_deg)), 

119 "sep_max_deg": float(np.max(sep_deg)), 

120 "error": "" 

121 } 

122 

123 keep = stats["sep_med_deg"] < max_sep_deg 

124 stats["decision"] = "KEEP" if keep else "DELETE" 

125 return stats 

126 

127 

128# ------------------------- 

129# Logs discovery 

130# ------------------------- 

131 

132def find_obs_root_from_dl1(dl1_path: Path, obs_id: int) -> Path | None: 

133 """ 

134 Try to locate the directory path that corresponds to ".../<obs_id>/" within dl1_path. 

135 """ 

136 parts = dl1_path.parts 

137 obs_str = str(obs_id) 

138 if obs_str not in parts: 

139 return None 

140 i = parts.index(obs_str) 

141 return Path(*parts[:i + 1]) 

142 

143def find_logs_for_dl1(dl1_path: Path, log_subdir: Path): 

144 """ 

145 Look for logs in <obs_root>/<log_subdir>/ matching: 

146 <dl1_prefix>.o-* and <dl1_prefix>.e-* 

147 """ 

148 obs = extract_obs_id_from_name(dl1_path.name) 

149 if obs is None: 

150 return [] 

151 

152 obs_root = find_obs_root_from_dl1(dl1_path, obs) 

153 if obs_root is None: 

154 return [] 

155 

156 log_dir = obs_root / log_subdir 

157 if not log_dir.exists(): 

158 return [] 

159 

160 prefix = dl1_path.stem 

161 logs = [] 

162 logs.extend(sorted(log_dir.glob(f"{prefix}.o-*"))) 

163 logs.extend(sorted(log_dir.glob(f"{prefix}.e-*"))) 

164 return [p for p in logs] 

165 

166 

167# ------------------------- 

168# Main 

169# ------------------------- 

170 

171def setup_logger(out_dir: Path, prefix: str = "crab_pointing_cleanup"): 

172 out_dir.mkdir(parents=True, exist_ok=True) 

173 

174 # Timestamp (UTC) in filename to avoid overwriting 

175 ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S_UTC") 

176 log_path = out_dir / f"{prefix}_{ts}.log" 

177 

178 logger = logging.getLogger("crab_cleanup") 

179 logger.setLevel(logging.INFO) 

180 logger.handlers.clear() 

181 

182 fmt = logging.Formatter("%(asctime)s | %(levelname)s | %(message)s") 

183 

184 fh = logging.FileHandler(log_path) 

185 fh.setLevel(logging.INFO) 

186 fh.setFormatter(fmt) 

187 

188 sh = logging.StreamHandler() 

189 sh.setLevel(logging.INFO) 

190 sh.setFormatter(fmt) 

191 

192 logger.addHandler(fh) 

193 logger.addHandler(sh) 

194 

195 logger.info("Log file: %s", log_path) 

196 logger.info("Script start UTC: %s", datetime.now(timezone.utc).isoformat()) 

197 return logger, log_path 

198 

199def main(): 

200 parser = argparse.ArgumentParser(description="KEEP/DELETE DL1 based on telescope pointing separation to Crab.") 

201 parser.add_argument("--base-day-dir", type=Path, required=True, 

202 help="Day directory to scan (e.g. /.../2026/01/20 or /.../2026/01/)") 

203 parser.add_argument("--max-sep-deg", type=float, default=5.0, 

204 help="Max median separation (deg) to KEEP (default: 5.0)") 

205 parser.add_argument("--tel-group", type=str, default="tel_001", 

206 help="HDF5 telescope group name (default: tel_001)") 

207 parser.add_argument("--step", type=int, default=200, 

208 help="Subsampling step (default: 200)") 

209 parser.add_argument("--log-subdir", type=Path, default=Path("logs/dl1_alt_az"), 

210 help="Logs path relative to obs_id directory (default: logs/dl1_alt_az)") 

211 parser.add_argument("--out-dir", type=Path, default=Path("."), 

212 help="Output directory for txt/csv/log files (default: current directory)") 

213 parser.add_argument("--orm-height-m", type=float, default=2200.0, 

214 help="Site height in meters (default: 2200)") 

215 args = parser.parse_args() 

216 

217 logger, _ = setup_logger(args.out_dir) 

218 

219 base_day_dir = args.base_day_dir 

220 if not base_day_dir.exists(): 

221 raise FileNotFoundError(f"base-day-dir does not exist: {base_day_dir}") 

222 

223 # ORM / La Palma (LST-1) 

224 site = EarthLocation(lat=28.7617 * u.deg, lon=-17.89 * u.deg, height=args.orm_height_m * u.m) 

225 

226 # Crab (ICRS J2000) 

227 crab = SkyCoord(ra=83.6331 * u.deg, dec=22.0145 * u.deg, frame="icrs") 

228 

229 logger.info("Scanning DL1 under: %s", base_day_dir) 

230 dl1_by_obs = scan_dl1_files(base_day_dir) 

231 logger.info("Unique obs_id: %d", len(dl1_by_obs)) 

232 logger.info("Example obs_id: %s", list(dl1_by_obs.keys())[:5]) 

233 

234 rows = [] 

235 for obs_id, files in sorted(dl1_by_obs.items()): 

236 rep = files[0] 

237 try: 

238 stats = crab_separation_stats(rep, site=site, crab=crab, 

239 tel_group=args.tel_group, 

240 step=args.step, 

241 max_sep_deg=args.max_sep_deg) 

242 row = {"obs_id": obs_id, "rep_file": str(rep), **stats} 

243 rows.append(row) 

244 logger.info("obs_id=%s rep=%s decision=%s sep_med=%.3f deg", 

245 obs_id, rep.name, row["decision"], 

246 row["sep_med_deg"] if np.isfinite(row["sep_med_deg"]) else float("nan")) 

247 except Exception as e: 

248 rows.append({ 

249 "obs_id": obs_id, 

250 "rep_file": str(rep), 

251 "sep_min_deg": np.nan, "sep_med_deg": np.nan, "sep_mean_deg": np.nan, "sep_max_deg": np.nan, 

252 "decision": "ERROR", 

253 "error": repr(e) 

254 }) 

255 logger.exception("obs_id=%s rep=%s ERROR: %r", obs_id, rep.name, e) 

256 

257 df = pd.DataFrame(rows).sort_values(["decision", "obs_id"]) 

258 logger.info("Decision counts:\n%s", df["decision"].value_counts(dropna=False).to_string()) 

259 

260 decision_by_obs = {int(r.obs_id): r.decision for _, r in df.iterrows()} 

261 

262 to_delete_dl1 = [] 

263 to_delete_logs = [] 

264 to_keep_dl1 = [] 

265 

266 for obs_id, files in dl1_by_obs.items(): 

267 dec = decision_by_obs.get(obs_id, "ERROR") 

268 if dec == "KEEP": 

269 to_keep_dl1.extend(files) 

270 elif dec == "DELETE": 

271 to_delete_dl1.extend(files) 

272 for f in files: 

273 to_delete_logs.extend(find_logs_for_dl1(f, log_subdir=args.log_subdir)) 

274 

275 to_delete_dl1 = sorted(set(to_delete_dl1)) 

276 to_delete_logs = sorted(set(to_delete_logs)) 

277 to_keep_dl1 = sorted(set(to_keep_dl1)) 

278 

279 logger.info("DL1 to delete: %d", len(to_delete_dl1)) 

280 logger.info("Logs to delete: %d", len(to_delete_logs)) 

281 logger.info("DL1 to keep: %d", len(to_keep_dl1)) 

282 

283 # Outputs 

284 out_dir = args.out_dir 

285 out_del_dl1 = out_dir / "to_delete_dl1.txt" 

286 out_del_logs = out_dir / "to_delete_logs.txt" 

287 out_keep = out_dir / "to_keep_dl1.txt" 

288 out_csv = out_dir / "obsid_crab_pointing_summary.csv" 

289 

290 out_del_dl1.write_text("\n".join(str(p) for p in to_delete_dl1) + ("\n" if to_delete_dl1 else "")) 

291 out_del_logs.write_text("\n".join(str(p) for p in to_delete_logs) + ("\n" if to_delete_logs else "")) 

292 out_keep.write_text("\n".join(str(p) for p in to_keep_dl1) + ("\n" if to_keep_dl1 else "")) 

293 df.to_csv(out_csv, index=False) 

294 

295 logger.info("Wrote: %s", out_del_dl1) 

296 logger.info("Wrote: %s", out_del_logs) 

297 logger.info("Wrote: %s", out_keep) 

298 logger.info("Wrote: %s", out_csv) 

299 

300 keep_obs = df.loc[df["decision"] == "KEEP", "obs_id"].to_list() 

301 logger.info("KEEP obs_id list: %s", keep_obs) 

302 

303 # Shell helpers 

304 logger.info("### Dry-run command:") 

305 logger.info('cat "%s" "%s" | while read f; do echo rm -f "$f"; done', out_del_dl1, out_del_logs) 

306 

307 logger.info("### Delete command:") 

308 logger.info('cat "%s" "%s" | while read f; do rm -f "$f"; done', out_del_dl1, out_del_logs) 

309 

310 

311if __name__ == "__main__": 

312 main() 

313