Source code for lst_auto_rta.Auto_Check

#!/usr/bin/env python

import datetime
import logging
import os
import time

import pymongo

[docs] def wait_for_directory(directory, interval=5): """ Wait for the directory to exist :param directory: directory path :param interval: Time interval between verification """ while not os.path.exists(directory): print(f"{directory} does not exist yet. let's wait {interval} seconds...") time.sleep(interval) print(f"The directory {directory} finally exist !!!")
[docs] def now(): """ Returns the current timestamp in seconds, relative to the Unix epoch. :return: float """ return datetime.datetime.timestamp(datetime.datetime.now())
#def get_current_run(): # """ # Attempts to retrieve the current run number from the lst1_obs_summary database. # # :return: int # """ # try: # client = pymongo.MongoClient("lst101") # database = client["lst1_obs_summary"] # camera_collection = database["camera"] # summaries = camera_collection.find({}) # run_table = [] # summaries = camera_collection.find({}) # for summary in summaries: # if summary["tstart"] > datetime.datetime.timestamp(datetime.datetime.now()) - 3600 * 24 * 100: # run_table.append(summary["run_number"]) # if run_table[-2] != run_table[-1]: # return run_table[-1] # else: # return run_table[-1] + 1 # except Exception as e: # logging.error("Error DB !!!!!") # return -1
[docs] def get_current_run( db_hostname: str = "lst101", obs_target_query_timeout_s: int = 10, ) -> int: """ Standalone retrieval of the current run number from the lst1_obs_summary DB. Robust to duplicated run_number entries. """ try: with pymongo.MongoClient(db_hostname) as client: collection = client.get_database("lst1_obs_summary").get_collection("telescope") camera_data_index = [("data.camera.tstart", pymongo.DESCENDING)] keep_querying = True query_idx = 0 visited_ids: Set[object] = set() obs_id: Optional[int] = None nb_duplicated_run_numbers = 0 while keep_querying: cursor = collection.find( { "data.camera.run_number": {"$exists": True}, "data.camera.tstart": {"$exists": True}, "data.structure.tstart": {"$exists": True}, }, { "_id": True, "data.camera.run_number": True, "data.camera.tstart": True, "data.structure.tstart": True, }, sort=camera_data_index, max_time_ms=obs_target_query_timeout_s * 1000, skip=query_idx, limit=1, ) try: query_data = next(cursor) finally: # Close cursor explicitly for older pymongo versions cursor.close() # If the DB changes while iterating, we may get the same _id again: ignore it if query_data["_id"] in visited_ids: query_idx += 1 continue visited_ids.add(query_data["_id"]) camera_list = sorted(query_data["data"]["camera"], key=lambda x: x["tstart"], reverse=True) structure_list = sorted(query_data["data"]["structure"], key=lambda x: x["tstart"], reverse=True) # If the DB is incoherent for the latest run: raise error. if (len(structure_list) != len(camera_list)) and (query_idx == 0): raise pymongo.errors.PyMongoError( "TCU DB query returned inconsistent data.structure (length {}) " "and data.camera (length {})".format(len(structure_list), len(camera_list)) ) if query_idx == 0: obs_id = camera_list[0]["run_number"] comparison_start_idx = 1 if query_idx == 0 else 0 for field in camera_list[comparison_start_idx:]: if field["run_number"] != obs_id: keep_querying = False break nb_duplicated_run_numbers += 1 query_idx += 1 if obs_id is None: logging.error("DB query succeeded but no obs_id was extracted") return -1 return int(obs_id + nb_duplicated_run_numbers) except pymongo.errors.ExecutionTimeout: logging.error("DB query timeout (max_time_ms exceeded) while retrieving current run number") return -1 except (StopIteration, KeyError, TypeError) as e: logging.error(f"DB returned unexpected/empty data while retrieving current run number: {e}") return -1 except pymongo.errors.PyMongoError as e: logging.error(f"MongoDB error while retrieving current run number: {e}") return -1 except Exception as e: logging.error(f"Unexpected error while retrieving current run number: {e}") return -1
[docs] def get_night_timestamp(today): """ Get current timestamp (YYYYMMDD) with respect to the night. It will consider a same night everything run between 08h00 until 07h59 of the next day. Ex: An observation launched at 04h00 (local time) will refer to the previous day. :param today: datetime object datetime object with current local time :return: timestamp: str """ # TODO check that today is a datetime object ? if today.hour < 8: # script launched between yesterday = today - datetime.timedelta(days=1) timestamp = f"{yesterday.year:04d}{yesterday.month:02d}{yesterday.day:02d}" else: timestamp = f"{today.year:04d}{today.month:02d}{today.day:02d}" return timestamp
[docs] def today_to_directory(today): return today[0:4]+"/"+today[4:6]+"/"+today[6:8]
[docs] def main(): """ Initialize current_run_is and quit variables and get the current timestamp with Start_time = now() Initialize the logging system by setting the log file name and logging level Change the current working directory to /fefs/onsite/pipeline/rta/data Find the current reservations for nodes using the scontrol command and parse the output to find the individual node names Modify a configuration file with sed commands, replacing placeholders with the node names found in step 4 Check if the ib0 network interface is in connected mode for each of the nodes and log the results """ current_run_is = -1 quit = False Start_time = now() # today = str(datetime.datetime.now().year) + str(datetime.datetime.now().month) + str(datetime.datetime.now().day) today = get_night_timestamp(datetime.datetime.now()) wait_for_directory("/fefs/onsite/pipeline/rta/data/"+today_to_directory(today)) logging.basicConfig(filename="/fefs/onsite/pipeline/rta/data/"+today_to_directory(today)+"/log_AutoCheck_" + today + ".txt", level=logging.INFO) logging.info("Start RTA Auto Check for the day " + today) RTA_ready = True loop_id = 0 data_directory = "'/fefs/onsite/pipeline/rta/data/" os.system("rm -f /fefs/onsite/pipeline/rta/data/plots/*") while quit == False and RTA_ready: time.sleep(10) # print(current_run_is+1) if current_run_is == -1: current_run_is = get_current_run() if current_run_is == -1: logging.info("None") current_run_is = -1 continue if current_run_is != get_current_run(): time.sleep(1) current_run_is = get_current_run() if current_run_is != -1: logging.info("Start RTA check for run " + str(current_run_is)) time.sleep(2) #logging.info(datetime.datetime.now()) #logging.info( # "srun --reservation=rta_one_node ./Auto_Check_DL1.py -da " # + today_to_directory(today) # + " -r " # + str(current_run_is + 1) #) #os.system( # "srun --reservation=rta_one_node ./Auto_Check_DL1.py -da " # + today_to_directory(today) # + " -r " # + str(current_run_is + 1) #) #os.system( # "convert -density 300 /fefs/onsite/pipeline/rta/data/" # + today_to_directory(today) # + "/" # + str(current_run_is + 1) # + "/plots/output_DL1.pdf /fefs/onsite/pipeline/rta/data/" # + today_to_directory(today) # + "/" # + str(current_run_is + 1) # + "/plots/Check_DL1.png" #) #logging.info(datetime.datetime.now()) #logging.info( # "srun --reservation=rta_one_node ./Auto_Check_DL2.py -da " # + today_to_directory(today) # + " -r " # + str(current_run_is + 1) #) #os.system( # "srun --reservation=rta_one_node ./Auto_Check_DL2.py -da " # + today_to_directory(today) # + " -r " # + str(current_run_is + 1) #) #os.system( # "convert -density 300 /fefs/onsite/pipeline/rta/data/" # + today_to_directory(today) # + "/" # + str(current_run_is + 1) # + "/plots/output_DL2.pdf /fefs/onsite/pipeline/rta/data/" # + today_to_directory(today) # + "/" # + str(current_run_is + 1) # + "/plots/Check_DL2.png" #) logging.info(datetime.datetime.now()) logging.info( "./merge_DL3.py --input-filter 'dl3_v06*' -d '/fefs/onsite/pipeline/rta/data/" + today_to_directory(today) + "/" + str(current_run_is + 1) + "/DL3/' -o '/fefs/onsite/pipeline/rta/data/" + today_to_directory(today) + "/" + str(current_run_is + 1) + "/DL3/' -r " + str(current_run_is + 1) ) os.system( "./merge_DL3.py --input-filter 'dl3_v06*' -d '/fefs/onsite/pipeline/rta/data/" + today_to_directory(today) + "/" + str(current_run_is + 1) + "/DL3/' -o '/fefs/onsite/pipeline/rta/data/" + today + "/" + str(current_run_is + 1) + "/DL3/' -r " + str(current_run_is + 1) ) time.sleep(10) os.system( "./merge_DL3.py --input-filter 'dl3_v06*' -d '/fefs/onsite/pipeline/rta/data/" + today_to_directory(today) + "/" + str(current_run_is) + "/DL3/' -o '/fefs/onsite/pipeline/rta/data/" + today_to_directory(today) + "/" + str(current_run_is) + "/DL3/' -r " + str(current_run_is) ) logging.info( "./merge_DL3.py --input-filter 'dl3_v06*' -d '/fefs/onsite/pipeline/rta/data/" + today_to_directory(today) + "/" + str(current_run_is + 1) + "/DL3/' -o '/fefs/onsite/pipeline/rta/data/" + today_to_directory(today) + "/" + str(current_run_is + 1) + "/DL3/' -r " + str(current_run_is + 1) ) if loop_id % 1 == 0: logging.info("Start RTA check for run " + str(current_run_is)) time.sleep(2) logging.info(datetime.datetime.now()) #logging.info( # "srun --reservation=rta_one_node ./Auto_Check_DL1.py -da " + today_to_directory(today) + " -r " + str(current_run_is + 1) #) #os.system( # "srun --reservation=rta_one_node ./Auto_Check_DL1.py -da " + today_to_directory(today) + " -r " + str(current_run_is + 1) #) #os.system( # "convert -density 300 /fefs/onsite/pipeline/rta/data/" # + today_to_directory(today) # + "/" # + str(current_run_is + 1) # + "/plots/output_DL1.pdf /fefs/onsite/pipeline/rta/data/" # + today_to_directory(today) # + "/" # + str(current_run_is + 1) # + "/Check_DL1.png" #) #logging.info(datetime.datetime.now()) #logging.info( # "srun --reservation=rta_one_node ./Auto_Check_DL2.py -da " + today_to_directory(today) + " -r " + str(current_run_is + 1) #) #os.system( # "srun --reservation=rta_one_node ./Auto_Check_DL2.py -da " + today_to_directory(today) + " -r " + str(current_run_is + 1) #) #os.system( # "convert -density 300 /fefs/onsite/pipeline/rta/data/" # + today_to_directory(today) # + "/" # + str(current_run_is + 1) # + "/plots/output_DL2.pdf /fefs/onsite/pipeline/rta/data/" # + today_to_directory(today) # + "/" # + str(current_run_is + 1) # + "/plots/Check_DL2.png" #) logging.info(datetime.datetime.now()) logging.info( "./merge_DL3.py --input-filter 'dl3_v06*' -d '/fefs/onsite/pipeline/rta/data/" + today_to_directory(today) + "/" + str(current_run_is + 1) + "/DL3/' -o '/fefs/onsite/pipeline/rta/data/" + today_to_directory(today) + "/" + str(current_run_is + 1) + "/DL3/' -r " + str(current_run_is + 1) ) os.system( "./merge_DL3.py --input-filter 'dl3_v06*' -d '/fefs/onsite/pipeline/rta/data/" + today_to_directory(today) + "/" + str(current_run_is + 1) + "/DL3/' -o '/fefs/onsite/pipeline/rta/data/" + today_to_directory(today) + "/" + str(current_run_is + 1) + "/DL3/' -r " + str(current_run_is + 1) ) os.system( "cp -rf /fefs/onsite/pipeline/rta/data/" + today_to_directory(today) + "/" + str(current_run_is + 1) + "/plots /fefs/onsite/pipeline/rta/data/." ) logging.info( "cp -rf /fefs/onsite/pipeline/rta/data/" + today_to_directory(today) + "/" + str(current_run_is + 1) + "/plots /fefs/onsite/pipeline/rta/data/." ) os.system("rm -f /fefs/onsite/pipeline/rta/data/plot_shifters/*") logging.info("rm -f /fefs/onsite/pipeline/rta/data/plot_shifters/*") os.system( "cp -f /fefs/onsite/pipeline/rta/data/" + today_to_directory(today) + "/" + str(current_run_is + 1) + "/plots/*_standard_theta2.png /fefs/onsite/pipeline/rta/data/plot_shifters/" ) logging.info( "cp -f /fefs/onsite/pipeline/rta/data/" + today_to_directory(today) + "/" + str(current_run_is + 1) + "/plots/*_standard_theta2.png /fefs/onsite/pipeline/rta/data/plot_shifters/" ) os.system( "cp -f /fefs/onsite/pipeline/rta/data/" + today_to_directory(today) + "/" + str(current_run_is + 1) + "/plots/*_standard__excess_significance_over_time.png /fefs/onsite/pipeline/rta/data/plot_shifters/" ) logging.info( "cp -f /fefs/onsite/pipeline/rta/data/" + today_to_directory(today) + "/" + str(current_run_is + 1) + "/plots/*_standard__excess_significance_over_time.png /fefs/onsite/pipeline/rta/data/plot_shifters/" ) os.system( "cp -f /fefs/onsite/pipeline/rta/data/" + today_to_directory(today) + "/" + str(current_run_is + 1) + "/plots/*_standard__spectra.png /fefs/onsite/pipeline/rta/data/plot_shifters/" ) logging.info( "cp -f /fefs/onsite/pipeline/rta/data/" + today_to_directory(today) + "/" + str(current_run_is + 1) + "/plots/*_standard__spectra.png /fefs/onsite/pipeline/rta/data/plot_shifters/" ) os.system( "cp -f /fefs/onsite/pipeline/rta/data/" + today_to_directory(today) + "/" + str(current_run_is + 1) + "/plots/*_standard__sky_map.png /fefs/onsite/pipeline/rta/data/plot_shifters/" ) logging.info( "cp -f /fefs/onsite/pipeline/rta/data/" + today_to_directory(today) + "/" + str(current_run_is + 1) + "/plots/*_standard__sky_map.png /fefs/onsite/pipeline/rta/data/plot_shifters/" ) loop_id = loop_id + 1 if (datetime.datetime.timestamp(datetime.datetime.now()) - Start_time) > 14 * 3600: quit = True logging.info("End of the night, Stop the RTA check")
if __name__ == "__main__": main()