Skip to content

Commit 7cd5951

Browse files
committed
Added retry logic with custom decorators and test cases
1 parent 7df9044 commit 7cd5951

File tree

7 files changed

+963
-1
lines changed

7 files changed

+963
-1
lines changed

COMMUNITY_PROVIDERS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Community-developed provider plugins that extend LangExtract with additional mod
1313
| AWS Bedrock | `langextract-bedrock` | [@andyxhadji](https://github.com/andyxhadji) | [andyxhadji/langextract-bedrock](https://github.com/andyxhadji/langextract-bedrock) | AWS Bedrock provider for LangExtract, supports all models & inference profiles | [#148](https://github.com/google/langextract/issues/148) |
1414
| LiteLLM | `langextract-litellm` | [@JustStas](https://github.com/JustStas) | [JustStas/langextract-litellm](https://github.com/JustStas/langextract-litellm) | LiteLLM provider for LangExtract, supports all models covered in LiteLLM, including OpenAI, Azure, Anthropic, etc., See [LiteLLM's supported models](https://docs.litellm.ai/docs/providers) | [#187](https://github.com/google/langextract/issues/187) |
1515
| Llama.cpp | `langextract-llamacpp` | [@fgarnadi](https://github.com/fgarnadi) | [fgarnadi/langextract-llamacpp](https://github.com/fgarnadi/langextract-llamacpp) | Llama.cpp provider for LangExtract, supports GGUF models from HuggingFace and local files | [#199](https://github.com/google/langextract/issues/199) |
16+
| Outlines | `langextract-outlines` | [@RobinPicard](https://github.com/RobinPicard) | [dottxt-ai/langextract-outlines](https://github.com/dottxt-ai/langextract-outlines) | Outlines provider for LangExtract, supports structured generation for various local and API-based models | [#101](https://github.com/google/langextract/issues/101) |
1617
| vLLM | `langextract-vllm` | [@wuli666](https://github.com/wuli666) | [wuli666/langextract-vllm](https://github.com/wuli666/langextract-vllm) | vLLM provider for LangExtract, supports local and distributed model serving | [#236](https://github.com/google/langextract/issues/236) |
1718
<!-- ADD NEW PLUGINS ABOVE THIS LINE -->
1819

langextract/annotation.py

Lines changed: 217 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@
3535
from langextract import progress
3636
from langextract import prompting
3737
from langextract import resolver as resolver_lib
38+
from langextract import retry_utils
3839
from langextract.core import base_model
3940
from langextract.core import data
4041
from langextract.core import exceptions
4142
from langextract.core import format_handler as fh
43+
from langextract.core import types as core_types
4244

4345

4446
class DocumentRepeatError(exceptions.LangExtractError):
@@ -202,6 +204,173 @@ def __init__(
202204
"Annotator initialized with format_handler: %s", format_handler
203205
)
204206

207+
def _process_batch_with_retry(
208+
self,
209+
batch_prompts: list[str],
210+
batch: list[chunking.TextChunk],
211+
retry_transient_errors: bool = True,
212+
max_retries: int = 3,
213+
retry_initial_delay: float = 1.0,
214+
retry_backoff_factor: float = 2.0,
215+
retry_max_delay: float = 60.0,
216+
**kwargs,
217+
) -> Iterator[list[core_types.ScoredOutput]]:
218+
"""Process a batch of prompts with individual chunk retry capability.
219+
220+
This method processes each chunk individually and retries failed chunks
221+
due to transient errors (like 503 "model overloaded") while preserving
222+
successful chunks from the same batch.
223+
224+
Args:
225+
batch_prompts: List of prompts for the batch
226+
batch: List of TextChunk objects corresponding to the prompts
227+
**kwargs: Additional arguments passed to the language model
228+
229+
Yields:
230+
Lists of ScoredOutputs, with retries for failed chunks
231+
"""
232+
try:
233+
batch_results = list(
234+
self._language_model.infer(
235+
batch_prompts=batch_prompts,
236+
**kwargs,
237+
)
238+
)
239+
240+
for result in batch_results:
241+
yield result
242+
return
243+
244+
except Exception as e:
245+
if not retry_utils.is_transient_error(e):
246+
raise
247+
248+
logging.warning(
249+
"Batch processing failed with transient error: %s. "
250+
"Falling back to individual chunk processing with retry.",
251+
str(e),
252+
)
253+
254+
individual_results = []
255+
256+
for i, (prompt, chunk) in enumerate(zip(batch_prompts, batch)):
257+
try:
258+
chunk_result = self._process_single_chunk_with_retry(
259+
prompt=prompt,
260+
chunk=chunk,
261+
retry_transient_errors=retry_transient_errors,
262+
max_retries=max_retries,
263+
retry_initial_delay=retry_initial_delay,
264+
retry_backoff_factor=retry_backoff_factor,
265+
retry_max_delay=retry_max_delay,
266+
**kwargs,
267+
)
268+
individual_results.append(chunk_result)
269+
270+
except Exception as e:
271+
logging.error(
272+
"Failed to process chunk %d after retries: %s. "
273+
"Chunk info: document_id=%s, text_length=%d. "
274+
"Stopping document processing.",
275+
i,
276+
str(e),
277+
chunk.document_id,
278+
len(chunk.chunk_text),
279+
)
280+
raise
281+
282+
for result in individual_results:
283+
yield result
284+
285+
def _process_single_chunk_with_retry(
286+
self,
287+
prompt: str,
288+
chunk: chunking.TextChunk,
289+
retry_transient_errors: bool = True,
290+
max_retries: int = 3,
291+
retry_initial_delay: float = 1.0,
292+
retry_backoff_factor: float = 2.0,
293+
retry_max_delay: float = 60.0,
294+
**kwargs,
295+
) -> list[core_types.ScoredOutput]:
296+
"""Process a single chunk with retry logic.
297+
298+
Args:
299+
prompt: The prompt for this chunk
300+
chunk: The TextChunk object
301+
retry_transient_errors: Whether to retry on transient errors
302+
max_retries: Maximum number of retry attempts
303+
retry_initial_delay: Initial delay before retry
304+
retry_backoff_factor: Backoff multiplier for retries
305+
retry_max_delay: Maximum delay between retries
306+
**kwargs: Additional arguments for the language model
307+
308+
Returns:
309+
List containing a single ScoredOutput for this chunk
310+
"""
311+
last_exception = None
312+
delay = retry_initial_delay
313+
314+
for attempt in range(max_retries + 1):
315+
try:
316+
batch_results = list(
317+
self._language_model.infer(
318+
batch_prompts=[prompt],
319+
**kwargs,
320+
)
321+
)
322+
323+
if not batch_results:
324+
raise exceptions.InferenceOutputError(
325+
f"No results returned for chunk in document {chunk.document_id}"
326+
)
327+
328+
return batch_results[0]
329+
330+
except Exception as e:
331+
last_exception = e
332+
333+
if not retry_transient_errors or not retry_utils.is_transient_error(e):
334+
logging.debug(
335+
"Not retrying chunk processing: retry_disabled=%s,"
336+
" is_transient=%s, error=%s",
337+
not retry_transient_errors,
338+
retry_utils.is_transient_error(e),
339+
str(e),
340+
)
341+
raise
342+
343+
if attempt >= max_retries:
344+
logging.error(
345+
"Chunk processing failed after %d retries: %s",
346+
max_retries,
347+
str(e),
348+
)
349+
raise
350+
351+
current_delay = min(delay, retry_max_delay)
352+
353+
import random
354+
355+
jitter_amount = current_delay * 0.1 * random.random()
356+
current_delay += jitter_amount
357+
358+
logging.warning(
359+
"Chunk processing failed on attempt %d/%d due to transient error:"
360+
" %s. Retrying in %.2f seconds...",
361+
attempt + 1,
362+
max_retries + 1,
363+
str(e),
364+
current_delay,
365+
)
366+
367+
time.sleep(current_delay)
368+
delay = min(delay * retry_backoff_factor, retry_max_delay)
369+
370+
if last_exception:
371+
raise last_exception
372+
raise RuntimeError("Chunk retry logic failed unexpectedly")
373+
205374
def annotate_documents(
206375
self,
207376
documents: Iterable[data.Document],
@@ -211,6 +380,11 @@ def annotate_documents(
211380
debug: bool = True,
212381
extraction_passes: int = 1,
213382
show_progress: bool = True,
383+
retry_transient_errors: bool = True,
384+
max_retries: int = 3,
385+
retry_initial_delay: float = 1.0,
386+
retry_backoff_factor: float = 2.0,
387+
retry_max_delay: float = 60.0,
214388
**kwargs,
215389
) -> Iterator[data.AnnotatedDocument]:
216390
"""Annotates a sequence of documents with NLP extractions.
@@ -253,6 +427,11 @@ def annotate_documents(
253427
batch_length,
254428
debug,
255429
show_progress,
430+
retry_transient_errors,
431+
max_retries,
432+
retry_initial_delay,
433+
retry_backoff_factor,
434+
retry_max_delay,
256435
**kwargs,
257436
)
258437
else:
@@ -264,6 +443,11 @@ def annotate_documents(
264443
debug,
265444
extraction_passes,
266445
show_progress,
446+
retry_transient_errors,
447+
max_retries,
448+
retry_initial_delay,
449+
retry_backoff_factor,
450+
retry_max_delay,
267451
**kwargs,
268452
)
269453

@@ -275,6 +459,11 @@ def _annotate_documents_single_pass(
275459
batch_length: int,
276460
debug: bool,
277461
show_progress: bool = True,
462+
retry_transient_errors: bool = True,
463+
max_retries: int = 3,
464+
retry_initial_delay: float = 1.0,
465+
retry_backoff_factor: float = 2.0,
466+
retry_max_delay: float = 60.0,
278467
**kwargs,
279468
) -> Iterator[data.AnnotatedDocument]:
280469
"""Single-pass annotation logic (original implementation)."""
@@ -321,8 +510,15 @@ def _annotate_documents_single_pass(
321510
)
322511
progress_bar.set_description(desc)
323512

324-
batch_scored_outputs = self._language_model.infer(
513+
# Process batch with individual chunk retry capability
514+
batch_scored_outputs = self._process_batch_with_retry(
325515
batch_prompts=batch_prompts,
516+
batch=batch,
517+
retry_transient_errors=retry_transient_errors,
518+
max_retries=max_retries,
519+
retry_initial_delay=retry_initial_delay,
520+
retry_backoff_factor=retry_backoff_factor,
521+
retry_max_delay=retry_max_delay,
326522
**kwargs,
327523
)
328524

@@ -419,6 +615,11 @@ def _annotate_documents_sequential_passes(
419615
debug: bool,
420616
extraction_passes: int,
421617
show_progress: bool = True,
618+
retry_transient_errors: bool = True,
619+
max_retries: int = 3,
620+
retry_initial_delay: float = 1.0,
621+
retry_backoff_factor: float = 2.0,
622+
retry_max_delay: float = 60.0,
422623
**kwargs,
423624
) -> Iterator[data.AnnotatedDocument]:
424625
"""Sequential extraction passes logic for improved recall."""
@@ -446,6 +647,11 @@ def _annotate_documents_sequential_passes(
446647
batch_length,
447648
debug=(debug and pass_num == 0),
448649
show_progress=show_progress if pass_num == 0 else False,
650+
retry_transient_errors=retry_transient_errors,
651+
max_retries=max_retries,
652+
retry_initial_delay=retry_initial_delay,
653+
retry_backoff_factor=retry_backoff_factor,
654+
retry_max_delay=retry_max_delay,
449655
**kwargs,
450656
):
451657
doc_id = annotated_doc.document_id
@@ -494,6 +700,11 @@ def annotate_text(
494700
debug: bool = True,
495701
extraction_passes: int = 1,
496702
show_progress: bool = True,
703+
retry_transient_errors: bool = True,
704+
max_retries: int = 3,
705+
retry_initial_delay: float = 1.0,
706+
retry_backoff_factor: float = 2.0,
707+
retry_max_delay: float = 60.0,
497708
**kwargs,
498709
) -> data.AnnotatedDocument:
499710
"""Annotates text with NLP extractions for text input.
@@ -540,6 +751,11 @@ def annotate_text(
540751
debug,
541752
extraction_passes,
542753
show_progress,
754+
retry_transient_errors,
755+
max_retries,
756+
retry_initial_delay,
757+
retry_backoff_factor,
758+
retry_max_delay,
543759
**kwargs,
544760
)
545761
)

langextract/extraction.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ def extract(
5959
prompt_validation_level: pv.PromptValidationLevel = pv.PromptValidationLevel.WARNING,
6060
prompt_validation_strict: bool = False,
6161
show_progress: bool = True,
62+
retry_transient_errors: bool = True,
63+
max_retries: int = 3,
64+
retry_initial_delay: float = 1.0,
65+
retry_backoff_factor: float = 2.0,
66+
retry_max_delay: float = 60.0,
6267
) -> typing.Any:
6368
"""Extracts structured information from text.
6469
@@ -150,6 +155,12 @@ def extract(
150155
prompt_validation_strict: When True and prompt_validation_level is ERROR,
151156
raises on non-exact matches (MATCH_FUZZY, MATCH_LESSER). Defaults to False.
152157
show_progress: Whether to show progress bar during extraction. Defaults to True.
158+
retry_transient_errors: Whether to automatically retry on transient errors
159+
like 503 "model overloaded". Defaults to True.
160+
max_retries: Maximum number of retry attempts for transient errors. Defaults to 3.
161+
retry_initial_delay: Initial delay in seconds before first retry. Defaults to 1.0.
162+
retry_backoff_factor: Multiplier for exponential backoff between retries. Defaults to 2.0.
163+
retry_max_delay: Maximum delay between retries in seconds. Defaults to 60.0.
153164
154165
Returns:
155166
An AnnotatedDocument with the extracted information when input is a
@@ -320,6 +331,16 @@ def extract(
320331
format_handler=format_handler,
321332
)
322333

334+
# Add retry parameters to alignment kwargs
335+
retry_kwargs = {
336+
"retry_transient_errors": retry_transient_errors,
337+
"max_retries": max_retries,
338+
"retry_initial_delay": retry_initial_delay,
339+
"retry_backoff_factor": retry_backoff_factor,
340+
"retry_max_delay": retry_max_delay,
341+
}
342+
alignment_kwargs.update(retry_kwargs)
343+
323344
if isinstance(text_or_documents, str):
324345
return annotator.annotate_text(
325346
text=text_or_documents,

langextract/providers/gemini.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
from absl import logging
2525

26+
from langextract import retry_utils
2627
from langextract.core import base_model
2728
from langextract.core import data
2829
from langextract.core import exceptions
@@ -179,6 +180,7 @@ def __init__(
179180
k: v for k, v in (kwargs or {}).items() if k in _API_CONFIG_KEYS
180181
}
181182

183+
@retry_utils.retry_chunk_processing()
182184
def _process_single_prompt(
183185
self, prompt: str, config: dict
184186
) -> core_types.ScoredOutput:

0 commit comments

Comments
 (0)