|
6 | 6 | import logging |
7 | 7 | from abc import ABC, abstractmethod |
8 | 8 | from dataclasses import dataclass, field |
9 | | -from typing import Any, Dict, List, Optional, Tuple |
| 9 | +from typing import Any, Dict, List, Optional, Tuple, Union |
10 | 10 |
|
11 | 11 | import openeo |
12 | 12 |
|
@@ -150,42 +150,42 @@ def submit_task(self, task: Task) -> None: |
150 | 150 | future = self._executor.submit(task.execute) |
151 | 151 | self._future_task_pairs.append((future, task)) # Track pairs |
152 | 152 |
|
153 | | - def process_futures(self) -> List[_TaskResult]: |
| 153 | + def process_futures(self, timeout: Union[float, None] = 0) -> Tuple[List[_TaskResult], int]: |
154 | 154 | """ |
155 | | - Process and retrieve results from completed tasks. |
| 155 | + Checks state of futures and collect results from completed ones. |
156 | 156 |
|
157 | | - This method checks which futures have finished without blocking, |
158 | | - collects their results. |
| 157 | + :param timeout: whether to wait for futures to complete or not: |
| 158 | + - 0: don't wait, just return current state. |
| 159 | + - non-zero value: wait for that many seconds to allow futures to complete, |
| 160 | + - None: (not recommended) wait indefinitely. |
159 | 161 |
|
160 | 162 | :returns: |
161 | | - A list of `_TaskResult` objects from completed tasks. |
| 163 | + Tuple of two elements: list of `_TaskResult` objects from completed tasks. |
| 164 | + and number of remaining tasks that are still in progress. |
162 | 165 | """ |
163 | 166 | results = [] |
164 | 167 | to_keep = [] |
165 | 168 |
|
166 | | - # Use timeout=0 to avoid blocking and check for completed futures |
167 | | - done, _ = concurrent.futures.wait( |
168 | | - [f for f, _ in self._future_task_pairs], timeout=0, return_when=concurrent.futures.FIRST_COMPLETED |
169 | | - ) |
| 169 | + done, _ = concurrent.futures.wait([f for f, _ in self._future_task_pairs], timeout=timeout) |
170 | 170 |
|
171 | | - # Process completed futures and their tasks |
172 | 171 | for future, task in self._future_task_pairs: |
173 | 172 | if future in done: |
174 | 173 | try: |
175 | 174 | result = future.result() |
176 | 175 | except Exception as e: |
177 | | - _log.exception(f"Failed to get result from future: {e}") |
| 176 | + _log.exception(f"Threaded task {task!r} failed: {e!r}") |
178 | 177 | result = _TaskResult( |
179 | 178 | job_id=task.job_id, |
180 | | - db_update={"status": "future.result() failed"}, |
181 | | - stats_update={"future.result() error": 1}, |
| 179 | + db_update={"status": "threaded task failed"}, |
| 180 | + stats_update={"threaded task failed": 1}, |
182 | 181 | ) |
183 | 182 | results.append(result) |
184 | 183 | else: |
185 | 184 | to_keep.append((future, task)) |
| 185 | + _log.info("process_futures: %d tasks done, %d tasks remaining", len(results), len(to_keep)) |
186 | 186 |
|
187 | 187 | self._future_task_pairs = to_keep |
188 | | - return results |
| 188 | + return results, len(to_keep) |
189 | 189 |
|
190 | 190 | def shutdown(self) -> None: |
191 | 191 | """Shuts down the thread pool gracefully.""" |
|
0 commit comments