Coverage for lst_auto_rta/utils/queue.py: 100%

17 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-03 14:47 +0000

1from queue import Empty, Queue 

2from typing import Any, Callable, Dict 

3 

4 

5def get_all_non_blocking(q): 

6 """Get all items present in the queue, non-blocking. 

7 

8 Parameters 

9 ---------- 

10 q : Queue 

11 The queue to query 

12 

13 Returns 

14 ------- 

15 List[Any] 

16 Items retrieved from the queue. 

17 """ 

18 items = [] 

19 while True: 

20 try: 

21 items.append(q.get(block=False)) 

22 except Empty: 

23 break 

24 return items 

25 

26 

27def process_all_items( 

28 q: Queue[Any], 

29 processed_items: set[Any], 

30 process_item_fct: Callable[..., None], 

31 process_item_fct_extra_kwargs: Dict[str, Any], 

32) -> int: 

33 """Empty `q` and apply `process_item_fct` on all items not in `processed_items` with kwargs from `process_item_fct_extra_kwargs` 

34 

35 Warnings 

36 -------- 

37 If the queue is filled faster than this function empties it, this will create an infinite list! 

38 

39 Parameters 

40 ---------- 

41 q : Queue[Any] 

42 queue which items will be processed 

43 processed_items : set[Any] 

44 Set of items that should not be processed. 

45 process_item_fct : Callable[..., None] 

46 Function to apply to the queue items 

47 process_item_fct_extra_kwargs : Dict[str, Any] 

48 Extra kwargs to pass to `process_item_fct` when proccessing items. 

49 

50 Returns 

51 ------- 

52 int 

53 The number of processed items. 

54 """ 

55 queue_items = get_all_non_blocking(q) 

56 for item in queue_items: 

57 if item not in processed_items: 

58 processed_items.add(item) 

59 process_item_fct(item, **process_item_fct_extra_kwargs) 

60 

61 return len(queue_items)