Coverage for lst_auto_rta/files_callback.py: 81%
76 statements
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-17 14:47 +0000
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-17 14:47 +0000
1import logging
2from multiprocessing import Queue as MPQueue
3from multiprocessing.synchronize import Event as SyncEvent # for type hints
4from pathlib import Path
5from queue import Empty, Queue
6from time import sleep
7from typing import Any, Callable, Dict, List, NamedTuple
9from watchdog.events import FileSystemEvent, PatternMatchingEventHandler
10from watchdog.observers import Observer
11from watchdog.utils.patterns import match_any_paths
13from lst_auto_rta.utils.logging import init_logging
14from lst_auto_rta.utils.queue import process_all_items
17class QueueingFilePatternMatchingEventHandler(PatternMatchingEventHandler):
18 """Handler that queues (blocking) the file paths of created files that matched the patterns
20 Parameters
21 ----------
22 file_queue : Queue
23 The queue to use to queue the files created.
24 """
26 def __init__(self, file_queue: Queue, **kwargs):
27 super().__init__(**kwargs)
28 self._file_queue = file_queue
30 def on_created(self, event: FileSystemEvent) -> None:
31 super().on_created(event)
32 # add file in queue, blocking until put can happen (should never block if queue has infinite size)
33 self._file_queue.put(Path(event.src_path))
36class DirMonitoringInfo(NamedTuple):
37 """Information required by `dir_monitoring` to start monitoring a directory. Typically received from a multiprocessing Queue."""
39 data_dir: Path
40 process_file_fct_extra_kwargs: Dict[str, Any]
41 on_dir_change_fct_extra_kwargs: Dict[str, Any]
44def dir_monitoring(
45 log_level: str,
46 log_filename: str,
47 log_format_title: str,
48 info_queue: MPQueue,
49 stop_event: SyncEvent,
50 process_file_fct: Callable,
51 on_dir_change_fct: Callable,
52 file_patterns: List[str] = None,
53 file_ignore_patterns: List[str] = None,
54 monitoring_ignore_directories=True,
55 pattern_case_sensitive: bool = True,
56 query_interval_s: float = 1.0,
57 monitoring_info_received_event: SyncEvent | None = None,
58):
59 """Monitors the last directory pushed on `info_queue` and applies `process_file_fct` on created files that match
60 the patterns, until stop event is set. Also executes `on_dir_change_fct` when the monitoring directoy is changed.
62 Notes
63 -----
64 Files already present in the monitored directory when monitoring starts will be processed as well.
66 Parameters
67 ----------
68 log_level : str
69 Log level to use for the logger of this function
70 log_filename : str
71 Path to the log files to write logs to.
72 log_format_title : str
73 "Title" to use in the log files lines.
74 info_queue : MPQueue[DirMonitoringInfo]
75 Multiprocessing queue to use to retrieve the directory to monitor and relevant informations to do it.
76 stop_event : SyncEvent
77 When this event is set, the monitoring will stop and the function will return.
78 is_monitoring_event: SyncEvent
79 process_file_fct : Callable
80 Function to apply to the created files (and already present files) in the monitored directory, passing the file path
81 as argument and extra kwargs based on information in `info_queue`
82 on_dir_change_fct : Callable
83 Function to execute when the directory to monitor is changed (a new one is available in `info_queue`) The monitored directory
84 path (before change) is passed to the function as argument, with additional kwargs based on information in `info_queue`.
85 file_patterns : List[str], optional
86 List of patterns to filter the created files. See watchdogs PatternMatchingEventHandler documentation. By default None
87 file_ignore_patterns : List[str], optional
88 List of patterns to ignore the created files. See watchdogs PatternMatchingEventHandler documentation. By default None
89 monitoring_ignore_directories : bool, optional
90 Whether the monitoring should ignore the created subdirectories, by default True
91 pattern_case_sensitive : bool, optional
92 If True, the patterns applied to filter or ignore files will be case insensitive, by default True
93 query_interval_s : float, optional
94 Amount of time in seconds to wait between 2 processing rounds of created files, by default 1.0
95 is_monitoring_event : SyncEvent | None
96 A event that is this function will set when starting to query the `info_queue`. It is mainly used in unit tests to make
97 sure the process monitoring is doing something before stopping the test. Optional, Defaults is None.
98 """
100 # init logging in a separate file for this monitoring process
101 # Note: this requires that the process is started with the "spawn" method, so it does not inherit the parent
102 # process file handles of logging !
103 init_logging(log_level=log_level, log_filename=log_filename, format_title=log_format_title)
105 if not isinstance(file_patterns, (list, None)): 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true
106 raise ValueError("File patterns must be a list of string or None, got {}".format(file_patterns))
107 if not isinstance(file_ignore_patterns, (list, None)): 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true
108 raise ValueError("Ignore pattern must be a list of string or None, got {}".format(file_ignore_patterns))
110 # queue to store paths of created files in the folder (that match pattern)
111 # with infinite size so we can use put/get non-blocking
112 new_file_queue: Queue[Path] = Queue(maxsize=-1)
114 # Once a monitoring is started, it continues until another directory to monitor is put in the info_queue, or the
115 # stop event is set. However, when the process is started, there may not yet be a DirMonitoringInfo in the queue yet.
116 monitoring_started = False
118 # This loop code order doesn't reflects the chronology of the code execution (in particular variable instantiation)
119 # before the 2nd DirMonitoringInfo:
120 # - Before the first DirMonitoringInfo is retrieved from the queue, we will simply do the "except" clause, which will
121 # do nothing because monitoring_started is False.
122 # - When the first DirMonitoringInfo is retrieved from the queue, we execute the "else" clause, but since
123 # monitoring_started is still False, we don't do the observer stop + final files processing. We directly go to the
124 # processing info execution and create the processing_info and all variables there.
125 # - When retrieving the 2nd and more DirMonitoringInfo from the queue, we execute the entire try + else clauses, and all
126 # required variables are defined.
127 while not stop_event.is_set():
128 try:
129 # Check if a new processing info was put in the queue
130 # Otherwise Empty is raised
131 new_processing_info = info_queue.get(block=False)
132 except Empty:
133 # no new processing info: simply process any queued files if we are monitoring something, then sleep
134 if monitoring_started:
135 nb_processed = process_all_items(
136 new_file_queue,
137 processed_files_set,
138 process_file_fct,
139 processing_info.process_file_fct_extra_kwargs,
140 )
141 if nb_processed > 0:
142 logging.debug("Processed {} files.".format(nb_processed))
143 sleep(query_interval_s)
144 else:
145 if monitoring_info_received_event is not None: 145 ↛ 156line 145 didn't jump to line 156 because the condition on line 145 was always true
146 monitoring_info_received_event.set()
147 # There is a new processing info:
148 # If monitoring:
149 # - stop monitoring
150 # - processed queued files
151 # - run on_dir_change function
152 # In any case:
153 # - start new monitoring
154 # - re-set all monitoring variables (processed files set)
155 # - queue all files already present in folder "before" monitoring started (can be some overlap, handled by the processed files set)
156 logging.info("New monitoring dir {}".format(new_processing_info.data_dir))
157 logging.info(
158 "New process_file_fct_extra_kwargs {}".format(new_processing_info.process_file_fct_extra_kwargs)
159 )
160 logging.info(
161 "New on_dir_change_fct_extra_kwargs {}".format(new_processing_info.on_dir_change_fct_extra_kwargs)
162 )
163 if monitoring_started:
164 logging.info("Stopping monitoring of {}".format(processing_info.data_dir))
165 observer.stop()
166 observer.join()
167 logging.info("Processing remaining items of {}".format(processing_info.data_dir))
168 nb_processed = process_all_items(
169 new_file_queue,
170 processed_files_set,
171 process_file_fct,
172 processing_info.process_file_fct_extra_kwargs,
173 )
174 logging.debug("Processed {} files.".format(nb_processed))
175 logging.info(
176 "Executing on_dir_change_fct on data_dir {}, with kwargs {}".format(
177 processing_info.data_dir, processing_info.on_dir_change_fct_extra_kwargs
178 )
179 )
180 on_dir_change_fct(processing_info.data_dir, **processing_info.on_dir_change_fct_extra_kwargs)
182 logging.info("Starting monitoring new dir {}".format(new_processing_info.data_dir))
183 # Start new monitoring
184 monitoring_started = True
185 processing_info = new_processing_info
186 file_event_handler = QueueingFilePatternMatchingEventHandler(
187 file_queue=new_file_queue, # queue should be empty at this point: either 1st monitoring so empty, or emptied by process_all_items above
188 patterns=file_patterns,
189 ignore_patterns=file_ignore_patterns,
190 ignore_directories=monitoring_ignore_directories,
191 case_sensitive=pattern_case_sensitive,
192 )
193 observer = Observer()
194 observer.schedule(file_event_handler, processing_info.data_dir)
195 observer.start()
196 logging.info("Monitoring started!")
198 # Add already present files in the queue
199 # It is possible to have duplicated files because when we start monitoring there could be files already present,
200 # which are then added to the queue. If a file is created between the monitoring start and the
201 # glob for already present files, they will be added twice to the queue, but processed_file_set will prevent them from
202 # been processed twice.
203 processed_files_set: set[Path] = set()
204 initial_files = [
205 f
206 for f in processing_info.data_dir.glob("*")
207 if f.is_file()
208 and match_any_paths( # use watchdogs pattern utils to apply same rules as during monitoring
209 [f],
210 included_patterns=file_patterns,
211 excluded_patterns=file_ignore_patterns,
212 case_sensitive=pattern_case_sensitive,
213 )
214 ]
215 logging.info("Adding {} already present files to queue".format(len(initial_files)))
216 for f in initial_files:
217 new_file_queue.put_nowait(f) # maxsize is infinite so will never block
219 # move on to next loop iteration, where we will process the files in the "else" clause
221 logging.info("Stop event is set, stopping monitoring and processing remaining files")
222 # finish processing files and execute on_dir_change before stopping
223 if monitoring_started: 223 ↛ 239line 223 didn't jump to line 239 because the condition on line 223 was always true
224 observer.stop()
225 observer.join()
226 nb_processed = process_all_items(
227 new_file_queue,
228 processed_files_set,
229 process_file_fct,
230 processing_info.process_file_fct_extra_kwargs,
231 )
232 logging.debug("Processed {} files.".format(nb_processed))
233 logging.info(
234 "Executing on_dir_change_fct on data_dir {}, with kwargs {}".format(
235 processing_info.data_dir, processing_info.on_dir_change_fct_extra_kwargs
236 )
237 )
238 on_dir_change_fct(processing_info.data_dir, **processing_info.on_dir_change_fct_extra_kwargs)
239 logging.info("Stopping monitoring")