#!/usr/bin/env python
import datetime
import logging
import os
import time
import pymongo
[docs]
def wait_for_directory(directory, interval=5):
"""
Wait for the directory to exist
:param directory: directory path
:param interval: Time interval between verification
"""
while not os.path.exists(directory):
print(f"{directory} does not exist yet. let's wait {interval} seconds...")
time.sleep(interval)
print(f"The directory {directory} finally exist !!!")
[docs]
def now():
"""
Returns the current timestamp in seconds, relative to the Unix epoch.
:return: float
"""
return datetime.datetime.timestamp(datetime.datetime.now())
#def get_current_run():
# """
# Attempts to retrieve the current run number from the lst1_obs_summary database.
#
# :return: int
# """
# try:
# client = pymongo.MongoClient("lst101")
# database = client["lst1_obs_summary"]
# camera_collection = database["camera"]
# summaries = camera_collection.find({})
# run_table = []
# summaries = camera_collection.find({})
# for summary in summaries:
# if summary["tstart"] > datetime.datetime.timestamp(datetime.datetime.now()) - 3600 * 24 * 100:
# run_table.append(summary["run_number"])
# if run_table[-2] != run_table[-1]:
# return run_table[-1]
# else:
# return run_table[-1] + 1
# except Exception as e:
# logging.error("Error DB !!!!!")
# return -1
[docs]
def get_current_run(
db_hostname: str = "lst101",
obs_target_query_timeout_s: int = 10,
) -> int:
"""
Standalone retrieval of the current run number from the lst1_obs_summary DB.
Robust to duplicated run_number entries.
"""
try:
with pymongo.MongoClient(db_hostname) as client:
collection = client.get_database("lst1_obs_summary").get_collection("telescope")
camera_data_index = [("data.camera.tstart", pymongo.DESCENDING)]
keep_querying = True
query_idx = 0
visited_ids: Set[object] = set()
obs_id: Optional[int] = None
nb_duplicated_run_numbers = 0
while keep_querying:
cursor = collection.find(
{
"data.camera.run_number": {"$exists": True},
"data.camera.tstart": {"$exists": True},
"data.structure.tstart": {"$exists": True},
},
{
"_id": True,
"data.camera.run_number": True,
"data.camera.tstart": True,
"data.structure.tstart": True,
},
sort=camera_data_index,
max_time_ms=obs_target_query_timeout_s * 1000,
skip=query_idx,
limit=1,
)
try:
query_data = next(cursor)
finally:
# Close cursor explicitly for older pymongo versions
cursor.close()
# If the DB changes while iterating, we may get the same _id again: ignore it
if query_data["_id"] in visited_ids:
query_idx += 1
continue
visited_ids.add(query_data["_id"])
camera_list = sorted(query_data["data"]["camera"], key=lambda x: x["tstart"], reverse=True)
structure_list = sorted(query_data["data"]["structure"], key=lambda x: x["tstart"], reverse=True)
# If the DB is incoherent for the latest run: raise error.
if (len(structure_list) != len(camera_list)) and (query_idx == 0):
raise pymongo.errors.PyMongoError(
"TCU DB query returned inconsistent data.structure (length {}) "
"and data.camera (length {})".format(len(structure_list), len(camera_list))
)
if query_idx == 0:
obs_id = camera_list[0]["run_number"]
comparison_start_idx = 1 if query_idx == 0 else 0
for field in camera_list[comparison_start_idx:]:
if field["run_number"] != obs_id:
keep_querying = False
break
nb_duplicated_run_numbers += 1
query_idx += 1
if obs_id is None:
logging.error("DB query succeeded but no obs_id was extracted")
return -1
return int(obs_id + nb_duplicated_run_numbers)
except pymongo.errors.ExecutionTimeout:
logging.error("DB query timeout (max_time_ms exceeded) while retrieving current run number")
return -1
except (StopIteration, KeyError, TypeError) as e:
logging.error(f"DB returned unexpected/empty data while retrieving current run number: {e}")
return -1
except pymongo.errors.PyMongoError as e:
logging.error(f"MongoDB error while retrieving current run number: {e}")
return -1
except Exception as e:
logging.error(f"Unexpected error while retrieving current run number: {e}")
return -1
[docs]
def get_night_timestamp(today):
"""
Get current timestamp (YYYYMMDD) with respect to the night.
It will consider a same night everything run between 08h00 until 07h59 of the next day.
Ex: An observation launched at 04h00 (local time) will refer to the previous day.
:param today: datetime object
datetime object with current local time
:return:
timestamp: str
"""
# TODO check that today is a datetime object ?
if today.hour < 8: # script launched between
yesterday = today - datetime.timedelta(days=1)
timestamp = f"{yesterday.year:04d}{yesterday.month:02d}{yesterday.day:02d}"
else:
timestamp = f"{today.year:04d}{today.month:02d}{today.day:02d}"
return timestamp
[docs]
def today_to_directory(today):
return today[0:4]+"/"+today[4:6]+"/"+today[6:8]
[docs]
def main():
"""
Initialize current_run_is and quit variables and get the current timestamp with Start_time = now()
Initialize the logging system by setting the log file name and logging level
Change the current working directory to /fefs/onsite/pipeline/rta/data
Find the current reservations for nodes using the scontrol command and parse the output to find the individual node names
Modify a configuration file with sed commands, replacing placeholders with the node names found in step 4
Check if the ib0 network interface is in connected mode for each of the nodes and log the results
"""
current_run_is = -1
quit = False
Start_time = now()
# today = str(datetime.datetime.now().year) + str(datetime.datetime.now().month) + str(datetime.datetime.now().day)
today = get_night_timestamp(datetime.datetime.now())
wait_for_directory("/fefs/onsite/pipeline/rta/data/"+today_to_directory(today))
logging.basicConfig(filename="/fefs/onsite/pipeline/rta/data/"+today_to_directory(today)+"/log_AutoCheck_" + today + ".txt", level=logging.INFO)
logging.info("Start RTA Auto Check for the day " + today)
RTA_ready = True
loop_id = 0
data_directory = "'/fefs/onsite/pipeline/rta/data/"
os.system("rm -f /fefs/onsite/pipeline/rta/data/plots/*")
while quit == False and RTA_ready:
time.sleep(10)
# print(current_run_is+1)
if current_run_is == -1:
current_run_is = get_current_run()
if current_run_is == -1:
logging.info("None")
current_run_is = -1
continue
if current_run_is != get_current_run():
time.sleep(1)
current_run_is = get_current_run()
if current_run_is != -1:
logging.info("Start RTA check for run " + str(current_run_is))
time.sleep(2)
#logging.info(datetime.datetime.now())
#logging.info(
# "srun --reservation=rta_one_node ./Auto_Check_DL1.py -da "
# + today_to_directory(today)
# + " -r "
# + str(current_run_is + 1)
#)
#os.system(
# "srun --reservation=rta_one_node ./Auto_Check_DL1.py -da "
# + today_to_directory(today)
# + " -r "
# + str(current_run_is + 1)
#)
#os.system(
# "convert -density 300 /fefs/onsite/pipeline/rta/data/"
# + today_to_directory(today)
# + "/"
# + str(current_run_is + 1)
# + "/plots/output_DL1.pdf /fefs/onsite/pipeline/rta/data/"
# + today_to_directory(today)
# + "/"
# + str(current_run_is + 1)
# + "/plots/Check_DL1.png"
#)
#logging.info(datetime.datetime.now())
#logging.info(
# "srun --reservation=rta_one_node ./Auto_Check_DL2.py -da "
# + today_to_directory(today)
# + " -r "
# + str(current_run_is + 1)
#)
#os.system(
# "srun --reservation=rta_one_node ./Auto_Check_DL2.py -da "
# + today_to_directory(today)
# + " -r "
# + str(current_run_is + 1)
#)
#os.system(
# "convert -density 300 /fefs/onsite/pipeline/rta/data/"
# + today_to_directory(today)
# + "/"
# + str(current_run_is + 1)
# + "/plots/output_DL2.pdf /fefs/onsite/pipeline/rta/data/"
# + today_to_directory(today)
# + "/"
# + str(current_run_is + 1)
# + "/plots/Check_DL2.png"
#)
logging.info(datetime.datetime.now())
logging.info(
"./merge_DL3.py --input-filter 'dl3_v06*' -d '/fefs/onsite/pipeline/rta/data/"
+ today_to_directory(today)
+ "/"
+ str(current_run_is + 1)
+ "/DL3/' -o '/fefs/onsite/pipeline/rta/data/"
+ today_to_directory(today)
+ "/"
+ str(current_run_is + 1)
+ "/DL3/' -r "
+ str(current_run_is + 1)
)
os.system(
"./merge_DL3.py --input-filter 'dl3_v06*' -d '/fefs/onsite/pipeline/rta/data/"
+ today_to_directory(today)
+ "/"
+ str(current_run_is + 1)
+ "/DL3/' -o '/fefs/onsite/pipeline/rta/data/"
+ today
+ "/"
+ str(current_run_is + 1)
+ "/DL3/' -r "
+ str(current_run_is + 1)
)
time.sleep(10)
os.system(
"./merge_DL3.py --input-filter 'dl3_v06*' -d '/fefs/onsite/pipeline/rta/data/"
+ today_to_directory(today)
+ "/"
+ str(current_run_is)
+ "/DL3/' -o '/fefs/onsite/pipeline/rta/data/"
+ today_to_directory(today)
+ "/"
+ str(current_run_is)
+ "/DL3/' -r "
+ str(current_run_is)
)
logging.info(
"./merge_DL3.py --input-filter 'dl3_v06*' -d '/fefs/onsite/pipeline/rta/data/"
+ today_to_directory(today)
+ "/"
+ str(current_run_is + 1)
+ "/DL3/' -o '/fefs/onsite/pipeline/rta/data/"
+ today_to_directory(today)
+ "/"
+ str(current_run_is + 1)
+ "/DL3/' -r "
+ str(current_run_is + 1)
)
if loop_id % 1 == 0:
logging.info("Start RTA check for run " + str(current_run_is))
time.sleep(2)
logging.info(datetime.datetime.now())
#logging.info(
# "srun --reservation=rta_one_node ./Auto_Check_DL1.py -da " + today_to_directory(today) + " -r " + str(current_run_is + 1)
#)
#os.system(
# "srun --reservation=rta_one_node ./Auto_Check_DL1.py -da " + today_to_directory(today) + " -r " + str(current_run_is + 1)
#)
#os.system(
# "convert -density 300 /fefs/onsite/pipeline/rta/data/"
# + today_to_directory(today)
# + "/"
# + str(current_run_is + 1)
# + "/plots/output_DL1.pdf /fefs/onsite/pipeline/rta/data/"
# + today_to_directory(today)
# + "/"
# + str(current_run_is + 1)
# + "/Check_DL1.png"
#)
#logging.info(datetime.datetime.now())
#logging.info(
# "srun --reservation=rta_one_node ./Auto_Check_DL2.py -da " + today_to_directory(today) + " -r " + str(current_run_is + 1)
#)
#os.system(
# "srun --reservation=rta_one_node ./Auto_Check_DL2.py -da " + today_to_directory(today) + " -r " + str(current_run_is + 1)
#)
#os.system(
# "convert -density 300 /fefs/onsite/pipeline/rta/data/"
# + today_to_directory(today)
# + "/"
# + str(current_run_is + 1)
# + "/plots/output_DL2.pdf /fefs/onsite/pipeline/rta/data/"
# + today_to_directory(today)
# + "/"
# + str(current_run_is + 1)
# + "/plots/Check_DL2.png"
#)
logging.info(datetime.datetime.now())
logging.info(
"./merge_DL3.py --input-filter 'dl3_v06*' -d '/fefs/onsite/pipeline/rta/data/"
+ today_to_directory(today)
+ "/"
+ str(current_run_is + 1)
+ "/DL3/' -o '/fefs/onsite/pipeline/rta/data/"
+ today_to_directory(today)
+ "/"
+ str(current_run_is + 1)
+ "/DL3/' -r "
+ str(current_run_is + 1)
)
os.system(
"./merge_DL3.py --input-filter 'dl3_v06*' -d '/fefs/onsite/pipeline/rta/data/"
+ today_to_directory(today)
+ "/"
+ str(current_run_is + 1)
+ "/DL3/' -o '/fefs/onsite/pipeline/rta/data/"
+ today_to_directory(today)
+ "/"
+ str(current_run_is + 1)
+ "/DL3/' -r "
+ str(current_run_is + 1)
)
os.system(
"cp -rf /fefs/onsite/pipeline/rta/data/"
+ today_to_directory(today)
+ "/"
+ str(current_run_is + 1)
+ "/plots /fefs/onsite/pipeline/rta/data/."
)
logging.info(
"cp -rf /fefs/onsite/pipeline/rta/data/"
+ today_to_directory(today)
+ "/"
+ str(current_run_is + 1)
+ "/plots /fefs/onsite/pipeline/rta/data/."
)
os.system("rm -f /fefs/onsite/pipeline/rta/data/plot_shifters/*")
logging.info("rm -f /fefs/onsite/pipeline/rta/data/plot_shifters/*")
os.system(
"cp -f /fefs/onsite/pipeline/rta/data/"
+ today_to_directory(today)
+ "/"
+ str(current_run_is + 1)
+ "/plots/*_standard_theta2.png /fefs/onsite/pipeline/rta/data/plot_shifters/"
)
logging.info(
"cp -f /fefs/onsite/pipeline/rta/data/"
+ today_to_directory(today)
+ "/"
+ str(current_run_is + 1)
+ "/plots/*_standard_theta2.png /fefs/onsite/pipeline/rta/data/plot_shifters/"
)
os.system(
"cp -f /fefs/onsite/pipeline/rta/data/"
+ today_to_directory(today)
+ "/"
+ str(current_run_is + 1)
+ "/plots/*_standard__excess_significance_over_time.png /fefs/onsite/pipeline/rta/data/plot_shifters/"
)
logging.info(
"cp -f /fefs/onsite/pipeline/rta/data/"
+ today_to_directory(today)
+ "/"
+ str(current_run_is + 1)
+ "/plots/*_standard__excess_significance_over_time.png /fefs/onsite/pipeline/rta/data/plot_shifters/"
)
os.system(
"cp -f /fefs/onsite/pipeline/rta/data/"
+ today_to_directory(today)
+ "/"
+ str(current_run_is + 1)
+ "/plots/*_standard__spectra.png /fefs/onsite/pipeline/rta/data/plot_shifters/"
)
logging.info(
"cp -f /fefs/onsite/pipeline/rta/data/"
+ today_to_directory(today)
+ "/"
+ str(current_run_is + 1)
+ "/plots/*_standard__spectra.png /fefs/onsite/pipeline/rta/data/plot_shifters/"
)
os.system(
"cp -f /fefs/onsite/pipeline/rta/data/"
+ today_to_directory(today)
+ "/"
+ str(current_run_is + 1)
+ "/plots/*_standard__sky_map.png /fefs/onsite/pipeline/rta/data/plot_shifters/"
)
logging.info(
"cp -f /fefs/onsite/pipeline/rta/data/"
+ today_to_directory(today)
+ "/"
+ str(current_run_is + 1)
+ "/plots/*_standard__sky_map.png /fefs/onsite/pipeline/rta/data/plot_shifters/"
)
loop_id = loop_id + 1
if (datetime.datetime.timestamp(datetime.datetime.now()) - Start_time) > 14 * 3600:
quit = True
logging.info("End of the night, Stop the RTA check")
if __name__ == "__main__":
main()