1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
| # Generic type for task input and task result
from typing import Any, TypeVar
TI = TypeVar("TI", bound=Any)
TR = TypeVar("TR", bound=Any)
@dataclass
class Task(Generic[TI]):
"""Generic task container."""
task_id: str
func: Callable[..., Any]
args: tuple[Any, ...] = ()
kwargs: dict[str, Any] | None = None
def __post_init__(self) -> None:
self.kwargs = self.kwargs or {}
class ThreadingService(Generic[TI, TR]):
"""Generic service for processing tasks with multiple workers."""
def __init__(self, logger: Logger, max_workers: int = 5):
self.task_queue: Queue[Task | None] = Queue()
self.logger = logger
self.max_workers = max_workers
self.workers: list[threading.Thread] = []
self.results: dict[str, Any] = {}
self._lock = threading.Lock()
self.is_running = False
def start(self) -> None:
if not self.is_running:
self.is_running = True
for _ in range(self.max_workers):
worker = threading.Thread(target=self._process_tasks, daemon=True)
self.workers.append(worker)
worker.start()
def add_task(self, task: Task) -> None:
self.task_queue.put(task)
if not self.is_running:
self.start()
def add_tasks(self, tasks: list[Task]) -> None:
for task in tasks:
self.task_queue.put(task)
if not self.is_running:
self.start()
def get_result(self, task_id: str) -> Any | None:
with self._lock:
return self.results.pop(task_id, None)
def get_results(self, max_results: int = 0) -> dict[str, Any]:
with self._lock:
if max_results <= 0:
results_to_return = self.results.copy()
self.results.clear()
return results_to_return
keys = list(self.results.keys())[:max_results]
return {key: self.results.pop(key) for key in keys}
def stop(self, timeout: int | None = None) -> None:
self.task_queue.join()
for _ in range(self.max_workers):
self.task_queue.put(None)
for worker in self.workers:
worker.join(timeout=timeout)
self.workers.clear()
self.is_running = False
def _process_tasks(self) -> None:
while True:
task = self.task_queue.get()
if task is None:
break
try:
result = task.func(*task.args, **task.kwargs) # type: ignore
with self._lock:
self.results[task.task_id] = result
except Exception as e:
self.logger.error("Error processing task %s: %s", task.task_id, e)
finally:
self.task_queue.task_done()
|