from queue import Empty, Queue
from typing import Any, Callable, Dict
[docs]
def get_all_non_blocking(q):
"""Get all items present in the queue, non-blocking.
Parameters
----------
q : Queue
The queue to query
Returns
-------
List[Any]
Items retrieved from the queue.
"""
items = []
while True:
try:
items.append(q.get(block=False))
except Empty:
break
return items
[docs]
def process_all_items(
q: Queue[Any],
processed_items: set[Any],
process_item_fct: Callable[..., None],
process_item_fct_extra_kwargs: Dict[str, Any],
) -> int:
"""Empty `q` and apply `process_item_fct` on all items not in `processed_items` with kwargs from `process_item_fct_extra_kwargs`
Warnings
--------
If the queue is filled faster than this function empties it, this will create an infinite list!
Parameters
----------
q : Queue[Any]
queue which items will be processed
processed_items : set[Any]
Set of items that should not be processed.
process_item_fct : Callable[..., None]
Function to apply to the queue items
process_item_fct_extra_kwargs : Dict[str, Any]
Extra kwargs to pass to `process_item_fct` when proccessing items.
Returns
-------
int
The number of processed items.
"""
queue_items = get_all_non_blocking(q)
for item in queue_items:
if item not in processed_items:
processed_items.add(item)
process_item_fct(item, **process_item_fct_extra_kwargs)
return len(queue_items)