Source code for lst_auto_rta.files_callback

import logging
from multiprocessing import Queue as MPQueue
from multiprocessing.synchronize import Event as SyncEvent  # for type hints
from pathlib import Path
from queue import Empty, Queue
from time import sleep
from typing import Any, Callable, Dict, List, NamedTuple

from watchdog.events import FileSystemEvent, PatternMatchingEventHandler
from watchdog.observers import Observer
from watchdog.utils.patterns import match_any_paths

from lst_auto_rta.utils.logging import init_logging
from lst_auto_rta.utils.queue import process_all_items


[docs] class QueueingFilePatternMatchingEventHandler(PatternMatchingEventHandler): """Handler that queues (blocking) the file paths of created files that matched the patterns Parameters ---------- file_queue : Queue The queue to use to queue the files created. """
[docs] def __init__(self, file_queue: Queue, **kwargs): super().__init__(**kwargs) self._file_queue = file_queue
[docs] def on_created(self, event: FileSystemEvent) -> None: super().on_created(event) # add file in queue, blocking until put can happen (should never block if queue has infinite size) self._file_queue.put(Path(event.src_path))
[docs] class DirMonitoringInfo(NamedTuple): """Information required by `dir_monitoring` to start monitoring a directory. Typically received from a multiprocessing Queue.""" data_dir: Path process_file_fct_extra_kwargs: Dict[str, Any] on_dir_change_fct_extra_kwargs: Dict[str, Any]
[docs] def dir_monitoring( log_level: str, log_filename: str, log_format_title: str, info_queue: MPQueue, stop_event: SyncEvent, process_file_fct: Callable, on_dir_change_fct: Callable, file_patterns: List[str] = None, file_ignore_patterns: List[str] = None, monitoring_ignore_directories=True, pattern_case_sensitive: bool = True, query_interval_s: float = 1.0, monitoring_info_received_event: SyncEvent | None = None, ): """Monitors the last directory pushed on `info_queue` and applies `process_file_fct` on created files that match the patterns, until stop event is set. Also executes `on_dir_change_fct` when the monitoring directoy is changed. Notes ----- Files already present in the monitored directory when monitoring starts will be processed as well. Parameters ---------- log_level : str Log level to use for the logger of this function log_filename : str Path to the log files to write logs to. log_format_title : str "Title" to use in the log files lines. info_queue : MPQueue[DirMonitoringInfo] Multiprocessing queue to use to retrieve the directory to monitor and relevant informations to do it. stop_event : SyncEvent When this event is set, the monitoring will stop and the function will return. is_monitoring_event: SyncEvent process_file_fct : Callable Function to apply to the created files (and already present files) in the monitored directory, passing the file path as argument and extra kwargs based on information in `info_queue` on_dir_change_fct : Callable Function to execute when the directory to monitor is changed (a new one is available in `info_queue`) The monitored directory path (before change) is passed to the function as argument, with additional kwargs based on information in `info_queue`. file_patterns : List[str], optional List of patterns to filter the created files. See watchdogs PatternMatchingEventHandler documentation. By default None file_ignore_patterns : List[str], optional List of patterns to ignore the created files. See watchdogs PatternMatchingEventHandler documentation. By default None monitoring_ignore_directories : bool, optional Whether the monitoring should ignore the created subdirectories, by default True pattern_case_sensitive : bool, optional If True, the patterns applied to filter or ignore files will be case insensitive, by default True query_interval_s : float, optional Amount of time in seconds to wait between 2 processing rounds of created files, by default 1.0 is_monitoring_event : SyncEvent | None A event that is this function will set when starting to query the `info_queue`. It is mainly used in unit tests to make sure the process monitoring is doing something before stopping the test. Optional, Defaults is None. """ # init logging in a separate file for this monitoring process # Note: this requires that the process is started with the "spawn" method, so it does not inherit the parent # process file handles of logging ! init_logging(log_level=log_level, log_filename=log_filename, format_title=log_format_title) if not isinstance(file_patterns, (list, None)): raise ValueError("File patterns must be a list of string or None, got {}".format(file_patterns)) if not isinstance(file_ignore_patterns, (list, None)): raise ValueError("Ignore pattern must be a list of string or None, got {}".format(file_ignore_patterns)) # queue to store paths of created files in the folder (that match pattern) # with infinite size so we can use put/get non-blocking new_file_queue: Queue[Path] = Queue(maxsize=-1) # Once a monitoring is started, it continues until another directory to monitor is put in the info_queue, or the # stop event is set. However, when the process is started, there may not yet be a DirMonitoringInfo in the queue yet. monitoring_started = False # This loop code order doesn't reflects the chronology of the code execution (in particular variable instantiation) # before the 2nd DirMonitoringInfo: # - Before the first DirMonitoringInfo is retrieved from the queue, we will simply do the "except" clause, which will # do nothing because monitoring_started is False. # - When the first DirMonitoringInfo is retrieved from the queue, we execute the "else" clause, but since # monitoring_started is still False, we don't do the observer stop + final files processing. We directly go to the # processing info execution and create the processing_info and all variables there. # - When retrieving the 2nd and more DirMonitoringInfo from the queue, we execute the entire try + else clauses, and all # required variables are defined. while not stop_event.is_set(): try: # Check if a new processing info was put in the queue # Otherwise Empty is raised new_processing_info = info_queue.get(block=False) except Empty: # no new processing info: simply process any queued files if we are monitoring something, then sleep if monitoring_started: nb_processed = process_all_items( new_file_queue, processed_files_set, process_file_fct, processing_info.process_file_fct_extra_kwargs, ) if nb_processed > 0: logging.debug("Processed {} files.".format(nb_processed)) sleep(query_interval_s) else: if monitoring_info_received_event is not None: monitoring_info_received_event.set() # There is a new processing info: # If monitoring: # - stop monitoring # - processed queued files # - run on_dir_change function # In any case: # - start new monitoring # - re-set all monitoring variables (processed files set) # - queue all files already present in folder "before" monitoring started (can be some overlap, handled by the processed files set) logging.info("New monitoring dir {}".format(new_processing_info.data_dir)) logging.info( "New process_file_fct_extra_kwargs {}".format(new_processing_info.process_file_fct_extra_kwargs) ) logging.info( "New on_dir_change_fct_extra_kwargs {}".format(new_processing_info.on_dir_change_fct_extra_kwargs) ) if monitoring_started: logging.info("Stopping monitoring of {}".format(processing_info.data_dir)) observer.stop() observer.join() logging.info("Processing remaining items of {}".format(processing_info.data_dir)) nb_processed = process_all_items( new_file_queue, processed_files_set, process_file_fct, processing_info.process_file_fct_extra_kwargs, ) logging.debug("Processed {} files.".format(nb_processed)) logging.info( "Executing on_dir_change_fct on data_dir {}, with kwargs {}".format( processing_info.data_dir, processing_info.on_dir_change_fct_extra_kwargs ) ) on_dir_change_fct(processing_info.data_dir, **processing_info.on_dir_change_fct_extra_kwargs) logging.info("Starting monitoring new dir {}".format(new_processing_info.data_dir)) # Start new monitoring monitoring_started = True processing_info = new_processing_info file_event_handler = QueueingFilePatternMatchingEventHandler( file_queue=new_file_queue, # queue should be empty at this point: either 1st monitoring so empty, or emptied by process_all_items above patterns=file_patterns, ignore_patterns=file_ignore_patterns, ignore_directories=monitoring_ignore_directories, case_sensitive=pattern_case_sensitive, ) observer = Observer() observer.schedule(file_event_handler, processing_info.data_dir) observer.start() logging.info("Monitoring started!") # Add already present files in the queue # It is possible to have duplicated files because when we start monitoring there could be files already present, # which are then added to the queue. If a file is created between the monitoring start and the # glob for already present files, they will be added twice to the queue, but processed_file_set will prevent them from # been processed twice. processed_files_set: set[Path] = set() initial_files = [ f for f in processing_info.data_dir.glob("*") if f.is_file() and match_any_paths( # use watchdogs pattern utils to apply same rules as during monitoring [f], included_patterns=file_patterns, excluded_patterns=file_ignore_patterns, case_sensitive=pattern_case_sensitive, ) ] logging.info("Adding {} already present files to queue".format(len(initial_files))) for f in initial_files: new_file_queue.put_nowait(f) # maxsize is infinite so will never block # move on to next loop iteration, where we will process the files in the "else" clause logging.info("Stop event is set, stopping monitoring and processing remaining files") # finish processing files and execute on_dir_change before stopping if monitoring_started: observer.stop() observer.join() nb_processed = process_all_items( new_file_queue, processed_files_set, process_file_fct, processing_info.process_file_fct_extra_kwargs, ) logging.debug("Processed {} files.".format(nb_processed)) logging.info( "Executing on_dir_change_fct on data_dir {}, with kwargs {}".format( processing_info.data_dir, processing_info.on_dir_change_fct_extra_kwargs ) ) on_dir_change_fct(processing_info.data_dir, **processing_info.on_dir_change_fct_extra_kwargs) logging.info("Stopping monitoring")