Source code for lst_auto_rta.utils.queue

from queue import Empty, Queue
from typing import Any, Callable, Dict


[docs] def get_all_non_blocking(q): """Get all items present in the queue, non-blocking. Parameters ---------- q : Queue The queue to query Returns ------- List[Any] Items retrieved from the queue. """ items = [] while True: try: items.append(q.get(block=False)) except Empty: break return items
[docs] def process_all_items( q: Queue[Any], processed_items: set[Any], process_item_fct: Callable[..., None], process_item_fct_extra_kwargs: Dict[str, Any], ) -> int: """Empty `q` and apply `process_item_fct` on all items not in `processed_items` with kwargs from `process_item_fct_extra_kwargs` Warnings -------- If the queue is filled faster than this function empties it, this will create an infinite list! Parameters ---------- q : Queue[Any] queue which items will be processed processed_items : set[Any] Set of items that should not be processed. process_item_fct : Callable[..., None] Function to apply to the queue items process_item_fct_extra_kwargs : Dict[str, Any] Extra kwargs to pass to `process_item_fct` when proccessing items. Returns ------- int The number of processed items. """ queue_items = get_all_non_blocking(q) for item in queue_items: if item not in processed_items: processed_items.add(item) process_item_fct(item, **process_item_fct_extra_kwargs) return len(queue_items)