import logging
from multiprocessing import Queue as MPQueue
from multiprocessing.synchronize import Event as SyncEvent # for type hints
from pathlib import Path
from queue import Empty, Queue
from time import sleep
from typing import Any, Callable, Dict, List, NamedTuple
from watchdog.events import FileSystemEvent, PatternMatchingEventHandler
from watchdog.observers import Observer
from watchdog.utils.patterns import match_any_paths
from lst_auto_rta.utils.logging import init_logging
from lst_auto_rta.utils.queue import process_all_items
[docs]
class QueueingFilePatternMatchingEventHandler(PatternMatchingEventHandler):
"""Handler that queues (blocking) the file paths of created files that matched the patterns
Parameters
----------
file_queue : Queue
The queue to use to queue the files created.
"""
[docs]
def __init__(self, file_queue: Queue, **kwargs):
super().__init__(**kwargs)
self._file_queue = file_queue
[docs]
def on_created(self, event: FileSystemEvent) -> None:
super().on_created(event)
# add file in queue, blocking until put can happen (should never block if queue has infinite size)
self._file_queue.put(Path(event.src_path))
[docs]
class DirMonitoringInfo(NamedTuple):
"""Information required by `dir_monitoring` to start monitoring a directory. Typically received from a multiprocessing Queue."""
data_dir: Path
process_file_fct_extra_kwargs: Dict[str, Any]
on_dir_change_fct_extra_kwargs: Dict[str, Any]
[docs]
def dir_monitoring(
log_level: str,
log_filename: str,
log_format_title: str,
info_queue: MPQueue,
stop_event: SyncEvent,
process_file_fct: Callable,
on_dir_change_fct: Callable,
file_patterns: List[str] = None,
file_ignore_patterns: List[str] = None,
monitoring_ignore_directories=True,
pattern_case_sensitive: bool = True,
query_interval_s: float = 1.0,
monitoring_info_received_event: SyncEvent | None = None,
):
"""Monitors the last directory pushed on `info_queue` and applies `process_file_fct` on created files that match
the patterns, until stop event is set. Also executes `on_dir_change_fct` when the monitoring directoy is changed.
Notes
-----
Files already present in the monitored directory when monitoring starts will be processed as well.
Parameters
----------
log_level : str
Log level to use for the logger of this function
log_filename : str
Path to the log files to write logs to.
log_format_title : str
"Title" to use in the log files lines.
info_queue : MPQueue[DirMonitoringInfo]
Multiprocessing queue to use to retrieve the directory to monitor and relevant informations to do it.
stop_event : SyncEvent
When this event is set, the monitoring will stop and the function will return.
is_monitoring_event: SyncEvent
process_file_fct : Callable
Function to apply to the created files (and already present files) in the monitored directory, passing the file path
as argument and extra kwargs based on information in `info_queue`
on_dir_change_fct : Callable
Function to execute when the directory to monitor is changed (a new one is available in `info_queue`) The monitored directory
path (before change) is passed to the function as argument, with additional kwargs based on information in `info_queue`.
file_patterns : List[str], optional
List of patterns to filter the created files. See watchdogs PatternMatchingEventHandler documentation. By default None
file_ignore_patterns : List[str], optional
List of patterns to ignore the created files. See watchdogs PatternMatchingEventHandler documentation. By default None
monitoring_ignore_directories : bool, optional
Whether the monitoring should ignore the created subdirectories, by default True
pattern_case_sensitive : bool, optional
If True, the patterns applied to filter or ignore files will be case insensitive, by default True
query_interval_s : float, optional
Amount of time in seconds to wait between 2 processing rounds of created files, by default 1.0
is_monitoring_event : SyncEvent | None
A event that is this function will set when starting to query the `info_queue`. It is mainly used in unit tests to make
sure the process monitoring is doing something before stopping the test. Optional, Defaults is None.
"""
# init logging in a separate file for this monitoring process
# Note: this requires that the process is started with the "spawn" method, so it does not inherit the parent
# process file handles of logging !
init_logging(log_level=log_level, log_filename=log_filename, format_title=log_format_title)
if not isinstance(file_patterns, (list, None)):
raise ValueError("File patterns must be a list of string or None, got {}".format(file_patterns))
if not isinstance(file_ignore_patterns, (list, None)):
raise ValueError("Ignore pattern must be a list of string or None, got {}".format(file_ignore_patterns))
# queue to store paths of created files in the folder (that match pattern)
# with infinite size so we can use put/get non-blocking
new_file_queue: Queue[Path] = Queue(maxsize=-1)
# Once a monitoring is started, it continues until another directory to monitor is put in the info_queue, or the
# stop event is set. However, when the process is started, there may not yet be a DirMonitoringInfo in the queue yet.
monitoring_started = False
# This loop code order doesn't reflects the chronology of the code execution (in particular variable instantiation)
# before the 2nd DirMonitoringInfo:
# - Before the first DirMonitoringInfo is retrieved from the queue, we will simply do the "except" clause, which will
# do nothing because monitoring_started is False.
# - When the first DirMonitoringInfo is retrieved from the queue, we execute the "else" clause, but since
# monitoring_started is still False, we don't do the observer stop + final files processing. We directly go to the
# processing info execution and create the processing_info and all variables there.
# - When retrieving the 2nd and more DirMonitoringInfo from the queue, we execute the entire try + else clauses, and all
# required variables are defined.
while not stop_event.is_set():
try:
# Check if a new processing info was put in the queue
# Otherwise Empty is raised
new_processing_info = info_queue.get(block=False)
except Empty:
# no new processing info: simply process any queued files if we are monitoring something, then sleep
if monitoring_started:
nb_processed = process_all_items(
new_file_queue,
processed_files_set,
process_file_fct,
processing_info.process_file_fct_extra_kwargs,
)
if nb_processed > 0:
logging.debug("Processed {} files.".format(nb_processed))
sleep(query_interval_s)
else:
if monitoring_info_received_event is not None:
monitoring_info_received_event.set()
# There is a new processing info:
# If monitoring:
# - stop monitoring
# - processed queued files
# - run on_dir_change function
# In any case:
# - start new monitoring
# - re-set all monitoring variables (processed files set)
# - queue all files already present in folder "before" monitoring started (can be some overlap, handled by the processed files set)
logging.info("New monitoring dir {}".format(new_processing_info.data_dir))
logging.info(
"New process_file_fct_extra_kwargs {}".format(new_processing_info.process_file_fct_extra_kwargs)
)
logging.info(
"New on_dir_change_fct_extra_kwargs {}".format(new_processing_info.on_dir_change_fct_extra_kwargs)
)
if monitoring_started:
logging.info("Stopping monitoring of {}".format(processing_info.data_dir))
observer.stop()
observer.join()
logging.info("Processing remaining items of {}".format(processing_info.data_dir))
nb_processed = process_all_items(
new_file_queue,
processed_files_set,
process_file_fct,
processing_info.process_file_fct_extra_kwargs,
)
logging.debug("Processed {} files.".format(nb_processed))
logging.info(
"Executing on_dir_change_fct on data_dir {}, with kwargs {}".format(
processing_info.data_dir, processing_info.on_dir_change_fct_extra_kwargs
)
)
on_dir_change_fct(processing_info.data_dir, **processing_info.on_dir_change_fct_extra_kwargs)
logging.info("Starting monitoring new dir {}".format(new_processing_info.data_dir))
# Start new monitoring
monitoring_started = True
processing_info = new_processing_info
file_event_handler = QueueingFilePatternMatchingEventHandler(
file_queue=new_file_queue, # queue should be empty at this point: either 1st monitoring so empty, or emptied by process_all_items above
patterns=file_patterns,
ignore_patterns=file_ignore_patterns,
ignore_directories=monitoring_ignore_directories,
case_sensitive=pattern_case_sensitive,
)
observer = Observer()
observer.schedule(file_event_handler, processing_info.data_dir)
observer.start()
logging.info("Monitoring started!")
# Add already present files in the queue
# It is possible to have duplicated files because when we start monitoring there could be files already present,
# which are then added to the queue. If a file is created between the monitoring start and the
# glob for already present files, they will be added twice to the queue, but processed_file_set will prevent them from
# been processed twice.
processed_files_set: set[Path] = set()
initial_files = [
f
for f in processing_info.data_dir.glob("*")
if f.is_file()
and match_any_paths( # use watchdogs pattern utils to apply same rules as during monitoring
[f],
included_patterns=file_patterns,
excluded_patterns=file_ignore_patterns,
case_sensitive=pattern_case_sensitive,
)
]
logging.info("Adding {} already present files to queue".format(len(initial_files)))
for f in initial_files:
new_file_queue.put_nowait(f) # maxsize is infinite so will never block
# move on to next loop iteration, where we will process the files in the "else" clause
logging.info("Stop event is set, stopping monitoring and processing remaining files")
# finish processing files and execute on_dir_change before stopping
if monitoring_started:
observer.stop()
observer.join()
nb_processed = process_all_items(
new_file_queue,
processed_files_set,
process_file_fct,
processing_info.process_file_fct_extra_kwargs,
)
logging.debug("Processed {} files.".format(nb_processed))
logging.info(
"Executing on_dir_change_fct on data_dir {}, with kwargs {}".format(
processing_info.data_dir, processing_info.on_dir_change_fct_extra_kwargs
)
)
on_dir_change_fct(processing_info.data_dir, **processing_info.on_dir_change_fct_extra_kwargs)
logging.info("Stopping monitoring")