Coverage for lst_auto_rta/files_callback.py: 81%

76 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-22 14:47 +0000

1import logging 

2from multiprocessing import Queue as MPQueue 

3from multiprocessing.synchronize import Event as SyncEvent # for type hints 

4from pathlib import Path 

5from queue import Empty, Queue 

6from time import sleep 

7from typing import Any, Callable, Dict, List, NamedTuple 

8 

9from watchdog.events import FileSystemEvent, PatternMatchingEventHandler 

10from watchdog.observers import Observer 

11from watchdog.utils.patterns import match_any_paths 

12 

13from lst_auto_rta.utils.logging import init_logging 

14from lst_auto_rta.utils.queue import process_all_items 

15 

16 

17class QueueingFilePatternMatchingEventHandler(PatternMatchingEventHandler): 

18 """Handler that queues (blocking) the file paths of created files that matched the patterns 

19 

20 Parameters 

21 ---------- 

22 file_queue : Queue 

23 The queue to use to queue the files created. 

24 """ 

25 

26 def __init__(self, file_queue: Queue, **kwargs): 

27 super().__init__(**kwargs) 

28 self._file_queue = file_queue 

29 

30 def on_created(self, event: FileSystemEvent) -> None: 

31 super().on_created(event) 

32 # add file in queue, blocking until put can happen (should never block if queue has infinite size) 

33 self._file_queue.put(Path(event.src_path)) 

34 

35 

36class DirMonitoringInfo(NamedTuple): 

37 """Information required by `dir_monitoring` to start monitoring a directory. Typically received from a multiprocessing Queue.""" 

38 

39 data_dir: Path 

40 process_file_fct_extra_kwargs: Dict[str, Any] 

41 on_dir_change_fct_extra_kwargs: Dict[str, Any] 

42 

43 

44def dir_monitoring( 

45 log_level: str, 

46 log_filename: str, 

47 log_format_title: str, 

48 info_queue: MPQueue, 

49 stop_event: SyncEvent, 

50 process_file_fct: Callable, 

51 on_dir_change_fct: Callable, 

52 file_patterns: List[str] = None, 

53 file_ignore_patterns: List[str] = None, 

54 monitoring_ignore_directories=True, 

55 pattern_case_sensitive: bool = True, 

56 query_interval_s: float = 1.0, 

57 monitoring_info_received_event: SyncEvent | None = None, 

58): 

59 """Monitors the last directory pushed on `info_queue` and applies `process_file_fct` on created files that match 

60 the patterns, until stop event is set. Also executes `on_dir_change_fct` when the monitoring directoy is changed. 

61 

62 Notes 

63 ----- 

64 Files already present in the monitored directory when monitoring starts will be processed as well. 

65 

66 Parameters 

67 ---------- 

68 log_level : str 

69 Log level to use for the logger of this function 

70 log_filename : str 

71 Path to the log files to write logs to. 

72 log_format_title : str 

73 "Title" to use in the log files lines. 

74 info_queue : MPQueue[DirMonitoringInfo] 

75 Multiprocessing queue to use to retrieve the directory to monitor and relevant informations to do it. 

76 stop_event : SyncEvent 

77 When this event is set, the monitoring will stop and the function will return. 

78 is_monitoring_event: SyncEvent 

79 process_file_fct : Callable 

80 Function to apply to the created files (and already present files) in the monitored directory, passing the file path 

81 as argument and extra kwargs based on information in `info_queue` 

82 on_dir_change_fct : Callable 

83 Function to execute when the directory to monitor is changed (a new one is available in `info_queue`) The monitored directory 

84 path (before change) is passed to the function as argument, with additional kwargs based on information in `info_queue`. 

85 file_patterns : List[str], optional 

86 List of patterns to filter the created files. See watchdogs PatternMatchingEventHandler documentation. By default None 

87 file_ignore_patterns : List[str], optional 

88 List of patterns to ignore the created files. See watchdogs PatternMatchingEventHandler documentation. By default None 

89 monitoring_ignore_directories : bool, optional 

90 Whether the monitoring should ignore the created subdirectories, by default True 

91 pattern_case_sensitive : bool, optional 

92 If True, the patterns applied to filter or ignore files will be case insensitive, by default True 

93 query_interval_s : float, optional 

94 Amount of time in seconds to wait between 2 processing rounds of created files, by default 1.0 

95 is_monitoring_event : SyncEvent | None 

96 A event that is this function will set when starting to query the `info_queue`. It is mainly used in unit tests to make 

97 sure the process monitoring is doing something before stopping the test. Optional, Defaults is None. 

98 """ 

99 

100 # init logging in a separate file for this monitoring process 

101 # Note: this requires that the process is started with the "spawn" method, so it does not inherit the parent 

102 # process file handles of logging ! 

103 init_logging(log_level=log_level, log_filename=log_filename, format_title=log_format_title) 

104 

105 if not isinstance(file_patterns, (list, None)): 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true

106 raise ValueError("File patterns must be a list of string or None, got {}".format(file_patterns)) 

107 if not isinstance(file_ignore_patterns, (list, None)): 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true

108 raise ValueError("Ignore pattern must be a list of string or None, got {}".format(file_ignore_patterns)) 

109 

110 # queue to store paths of created files in the folder (that match pattern) 

111 # with infinite size so we can use put/get non-blocking 

112 new_file_queue: Queue[Path] = Queue(maxsize=-1) 

113 

114 # Once a monitoring is started, it continues until another directory to monitor is put in the info_queue, or the 

115 # stop event is set. However, when the process is started, there may not yet be a DirMonitoringInfo in the queue yet. 

116 monitoring_started = False 

117 

118 # This loop code order doesn't reflects the chronology of the code execution (in particular variable instantiation) 

119 # before the 2nd DirMonitoringInfo: 

120 # - Before the first DirMonitoringInfo is retrieved from the queue, we will simply do the "except" clause, which will 

121 # do nothing because monitoring_started is False. 

122 # - When the first DirMonitoringInfo is retrieved from the queue, we execute the "else" clause, but since 

123 # monitoring_started is still False, we don't do the observer stop + final files processing. We directly go to the 

124 # processing info execution and create the processing_info and all variables there. 

125 # - When retrieving the 2nd and more DirMonitoringInfo from the queue, we execute the entire try + else clauses, and all 

126 # required variables are defined. 

127 while not stop_event.is_set(): 

128 try: 

129 # Check if a new processing info was put in the queue 

130 # Otherwise Empty is raised 

131 new_processing_info = info_queue.get(block=False) 

132 except Empty: 

133 # no new processing info: simply process any queued files if we are monitoring something, then sleep 

134 if monitoring_started: 

135 nb_processed = process_all_items( 

136 new_file_queue, 

137 processed_files_set, 

138 process_file_fct, 

139 processing_info.process_file_fct_extra_kwargs, 

140 ) 

141 if nb_processed > 0: 

142 logging.debug("Processed {} files.".format(nb_processed)) 

143 sleep(query_interval_s) 

144 else: 

145 if monitoring_info_received_event is not None: 145 ↛ 156line 145 didn't jump to line 156 because the condition on line 145 was always true

146 monitoring_info_received_event.set() 

147 # There is a new processing info: 

148 # If monitoring: 

149 # - stop monitoring 

150 # - processed queued files 

151 # - run on_dir_change function 

152 # In any case: 

153 # - start new monitoring 

154 # - re-set all monitoring variables (processed files set) 

155 # - queue all files already present in folder "before" monitoring started (can be some overlap, handled by the processed files set) 

156 logging.info("New monitoring dir {}".format(new_processing_info.data_dir)) 

157 logging.info( 

158 "New process_file_fct_extra_kwargs {}".format(new_processing_info.process_file_fct_extra_kwargs) 

159 ) 

160 logging.info( 

161 "New on_dir_change_fct_extra_kwargs {}".format(new_processing_info.on_dir_change_fct_extra_kwargs) 

162 ) 

163 if monitoring_started: 

164 logging.info("Stopping monitoring of {}".format(processing_info.data_dir)) 

165 observer.stop() 

166 observer.join() 

167 logging.info("Processing remaining items of {}".format(processing_info.data_dir)) 

168 nb_processed = process_all_items( 

169 new_file_queue, 

170 processed_files_set, 

171 process_file_fct, 

172 processing_info.process_file_fct_extra_kwargs, 

173 ) 

174 logging.debug("Processed {} files.".format(nb_processed)) 

175 logging.info( 

176 "Executing on_dir_change_fct on data_dir {}, with kwargs {}".format( 

177 processing_info.data_dir, processing_info.on_dir_change_fct_extra_kwargs 

178 ) 

179 ) 

180 on_dir_change_fct(processing_info.data_dir, **processing_info.on_dir_change_fct_extra_kwargs) 

181 

182 logging.info("Starting monitoring new dir {}".format(new_processing_info.data_dir)) 

183 # Start new monitoring 

184 monitoring_started = True 

185 processing_info = new_processing_info 

186 file_event_handler = QueueingFilePatternMatchingEventHandler( 

187 file_queue=new_file_queue, # queue should be empty at this point: either 1st monitoring so empty, or emptied by process_all_items above 

188 patterns=file_patterns, 

189 ignore_patterns=file_ignore_patterns, 

190 ignore_directories=monitoring_ignore_directories, 

191 case_sensitive=pattern_case_sensitive, 

192 ) 

193 observer = Observer() 

194 observer.schedule(file_event_handler, processing_info.data_dir) 

195 observer.start() 

196 logging.info("Monitoring started!") 

197 

198 # Add already present files in the queue 

199 # It is possible to have duplicated files because when we start monitoring there could be files already present, 

200 # which are then added to the queue. If a file is created between the monitoring start and the 

201 # glob for already present files, they will be added twice to the queue, but processed_file_set will prevent them from 

202 # been processed twice. 

203 processed_files_set: set[Path] = set() 

204 initial_files = [ 

205 f 

206 for f in processing_info.data_dir.glob("*") 

207 if f.is_file() 

208 and match_any_paths( # use watchdogs pattern utils to apply same rules as during monitoring 

209 [f], 

210 included_patterns=file_patterns, 

211 excluded_patterns=file_ignore_patterns, 

212 case_sensitive=pattern_case_sensitive, 

213 ) 

214 ] 

215 logging.info("Adding {} already present files to queue".format(len(initial_files))) 

216 for f in initial_files: 

217 new_file_queue.put_nowait(f) # maxsize is infinite so will never block 

218 

219 # move on to next loop iteration, where we will process the files in the "else" clause 

220 

221 logging.info("Stop event is set, stopping monitoring and processing remaining files") 

222 # finish processing files and execute on_dir_change before stopping 

223 if monitoring_started: 223 ↛ 239line 223 didn't jump to line 239 because the condition on line 223 was always true

224 observer.stop() 

225 observer.join() 

226 nb_processed = process_all_items( 

227 new_file_queue, 

228 processed_files_set, 

229 process_file_fct, 

230 processing_info.process_file_fct_extra_kwargs, 

231 ) 

232 logging.debug("Processed {} files.".format(nb_processed)) 

233 logging.info( 

234 "Executing on_dir_change_fct on data_dir {}, with kwargs {}".format( 

235 processing_info.data_dir, processing_info.on_dir_change_fct_extra_kwargs 

236 ) 

237 ) 

238 on_dir_change_fct(processing_info.data_dir, **processing_info.on_dir_change_fct_extra_kwargs) 

239 logging.info("Stopping monitoring")