@@ -216,43 +216,41 @@ def _process_batch_with_retry(
216216 ** kwargs ,
217217 ) -> Iterator [list [core_types .ScoredOutput ]]:
218218 """Process a batch of prompts with individual chunk retry capability.
219-
219+
220220 This method processes each chunk individually and retries failed chunks
221221 due to transient errors (like 503 "model overloaded") while preserving
222222 successful chunks from the same batch.
223-
223+
224224 Args:
225225 batch_prompts: List of prompts for the batch
226226 batch: List of TextChunk objects corresponding to the prompts
227227 **kwargs: Additional arguments passed to the language model
228-
228+
229229 Yields:
230230 Lists of ScoredOutputs, with retries for failed chunks
231231 """
232232 try :
233- batch_results = list (
234- self ._language_model .infer (
235- batch_prompts = batch_prompts ,
236- ** kwargs ,
237- )
238- )
239-
233+ batch_results = list (self ._language_model .infer (
234+ batch_prompts = batch_prompts ,
235+ ** kwargs ,
236+ ))
237+
240238 for result in batch_results :
241239 yield result
242240 return
243-
241+
244242 except Exception as e :
245243 if not retry_utils .is_transient_error (e ):
246244 raise
247-
245+
248246 logging .warning (
249247 "Batch processing failed with transient error: %s. "
250248 "Falling back to individual chunk processing with retry." ,
251- str (e ),
249+ str (e )
252250 )
253-
251+
254252 individual_results = []
255-
253+
256254 for i , (prompt , chunk ) in enumerate (zip (batch_prompts , batch )):
257255 try :
258256 chunk_result = self ._process_single_chunk_with_retry (
@@ -266,19 +264,16 @@ def _process_batch_with_retry(
266264 ** kwargs ,
267265 )
268266 individual_results .append (chunk_result )
269-
267+
270268 except Exception as e :
271269 logging .error (
272270 "Failed to process chunk %d after retries: %s. "
273271 "Chunk info: document_id=%s, text_length=%d. "
274272 "Stopping document processing." ,
275- i ,
276- str (e ),
277- chunk .document_id ,
278- len (chunk .chunk_text ),
273+ i , str (e ), chunk .document_id , len (chunk .chunk_text )
279274 )
280275 raise
281-
276+
282277 for result in individual_results :
283278 yield result
284279
@@ -294,7 +289,7 @@ def _process_single_chunk_with_retry(
294289 ** kwargs ,
295290 ) -> list [core_types .ScoredOutput ]:
296291 """Process a single chunk with retry logic.
297-
292+
298293 Args:
299294 prompt: The prompt for this chunk
300295 chunk: The TextChunk object
@@ -304,69 +299,59 @@ def _process_single_chunk_with_retry(
304299 retry_backoff_factor: Backoff multiplier for retries
305300 retry_max_delay: Maximum delay between retries
306301 **kwargs: Additional arguments for the language model
307-
302+
308303 Returns:
309304 List containing a single ScoredOutput for this chunk
310305 """
311306 last_exception = None
312307 delay = retry_initial_delay
313-
308+
314309 for attempt in range (max_retries + 1 ):
315310 try :
316- batch_results = list (
317- self ._language_model .infer (
318- batch_prompts = [prompt ],
319- ** kwargs ,
320- )
321- )
322-
311+ batch_results = list (self ._language_model .infer (
312+ batch_prompts = [prompt ],
313+ ** kwargs ,
314+ ))
315+
323316 if not batch_results :
324317 raise exceptions .InferenceOutputError (
325318 f"No results returned for chunk in document { chunk .document_id } "
326319 )
327-
320+
328321 return batch_results [0 ]
329-
322+
330323 except Exception as e :
331324 last_exception = e
332-
325+
333326 if not retry_transient_errors or not retry_utils .is_transient_error (e ):
334327 logging .debug (
335- "Not retrying chunk processing: retry_disabled=%s,"
336- " is_transient=%s, error=%s" ,
337- not retry_transient_errors ,
338- retry_utils .is_transient_error (e ),
339- str (e ),
328+ "Not retrying chunk processing: retry_disabled=%s, is_transient=%s, error=%s" ,
329+ not retry_transient_errors , retry_utils .is_transient_error (e ), str (e )
340330 )
341331 raise
342-
332+
343333 if attempt >= max_retries :
344334 logging .error (
345335 "Chunk processing failed after %d retries: %s" ,
346- max_retries ,
347- str (e ),
336+ max_retries , str (e )
348337 )
349338 raise
350-
339+
351340 current_delay = min (delay , retry_max_delay )
352-
341+
353342 import random
354-
355343 jitter_amount = current_delay * 0.1 * random .random ()
356344 current_delay += jitter_amount
357-
345+
358346 logging .warning (
359- "Chunk processing failed on attempt %d/%d due to transient error:"
360- " %s. Retrying in %.2f seconds..." ,
361- attempt + 1 ,
362- max_retries + 1 ,
363- str (e ),
364- current_delay ,
347+ "Chunk processing failed on attempt %d/%d due to transient error: %s. "
348+ "Retrying in %.2f seconds..." ,
349+ attempt + 1 , max_retries + 1 , str (e ), current_delay
365350 )
366-
351+
367352 time .sleep (current_delay )
368353 delay = min (delay * retry_backoff_factor , retry_max_delay )
369-
354+
370355 if last_exception :
371356 raise last_exception
372357 raise RuntimeError ("Chunk retry logic failed unexpectedly" )
0 commit comments