Coverage for lst_auto_rta/utils/queue.py: 100%
17 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-03 14:47 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-03 14:47 +0000
1from queue import Empty, Queue
2from typing import Any, Callable, Dict
5def get_all_non_blocking(q):
6 """Get all items present in the queue, non-blocking.
8 Parameters
9 ----------
10 q : Queue
11 The queue to query
13 Returns
14 -------
15 List[Any]
16 Items retrieved from the queue.
17 """
18 items = []
19 while True:
20 try:
21 items.append(q.get(block=False))
22 except Empty:
23 break
24 return items
27def process_all_items(
28 q: Queue[Any],
29 processed_items: set[Any],
30 process_item_fct: Callable[..., None],
31 process_item_fct_extra_kwargs: Dict[str, Any],
32) -> int:
33 """Empty `q` and apply `process_item_fct` on all items not in `processed_items` with kwargs from `process_item_fct_extra_kwargs`
35 Warnings
36 --------
37 If the queue is filled faster than this function empties it, this will create an infinite list!
39 Parameters
40 ----------
41 q : Queue[Any]
42 queue which items will be processed
43 processed_items : set[Any]
44 Set of items that should not be processed.
45 process_item_fct : Callable[..., None]
46 Function to apply to the queue items
47 process_item_fct_extra_kwargs : Dict[str, Any]
48 Extra kwargs to pass to `process_item_fct` when proccessing items.
50 Returns
51 -------
52 int
53 The number of processed items.
54 """
55 queue_items = get_all_non_blocking(q)
56 for item in queue_items:
57 if item not in processed_items:
58 processed_items.add(item)
59 process_item_fct(item, **process_item_fct_extra_kwargs)
61 return len(queue_items)