From 87b6b05d67a1d4784f486df94f58683b51cc75b2 Mon Sep 17 00:00:00 2001 From: Keysat Date: Tue, 16 Jun 2026 08:45:12 -0500 Subject: [PATCH] Add request timeout and retry to Gemini extraction backend A timeout-less generate_content call hung the single-threaded extract worker for ~50 min mid-batch. Set an HTTP timeout (120s) plus 4 retries with backoff, mirroring SparkControl._post; transient 504/read-timeouts now self-heal instead of freezing the run. --- signal_engine/extract/backends.py | 47 ++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/signal_engine/extract/backends.py b/signal_engine/extract/backends.py index 83ad2b2..98d14a9 100644 --- a/signal_engine/extract/backends.py +++ b/signal_engine/extract/backends.py @@ -10,6 +10,7 @@ A backend exposes: complete_json(messages, max_tokens) -> str (a JSON object st from __future__ import annotations import logging +import time log = logging.getLogger(__name__) @@ -32,31 +33,43 @@ class GeminiBackend: API is the eventual scale path; this synchronous form is the drop-in fallback.""" name = "gemini" - def __init__(self, api_key: str, model: str = "gemini-2.5-flash") -> None: + def __init__(self, api_key: str, model: str = "gemini-2.5-flash", *, + timeout_s: float = 120.0, retries: int = 4) -> None: from google import genai # guarded import; pip install google-genai + from google.genai import types self._genai = genai - self.client = genai.Client(api_key=api_key) + self._types = types + # http_options.timeout is in MILLISECONDS — without it a stalled request hangs the (single- + # threaded) worker forever; one such hang froze a whole batch for ~50 min before this fix. + self.client = genai.Client(api_key=api_key, + http_options=types.HttpOptions(timeout=int(timeout_s * 1000))) self.model = model + self.retries = retries def complete_json(self, messages: list[dict], *, max_tokens: int = 4000) -> str: - from google.genai import types + types = self._types system = "\n\n".join(m["content"] for m in messages if m["role"] == "system") user = "\n\n".join(m["content"] for m in messages if m["role"] != "system") - resp = self.client.models.generate_content( - model=self.model, - contents=user, - config=types.GenerateContentConfig( - system_instruction=system or None, - temperature=0, - max_output_tokens=max_tokens, - response_mime_type="application/json", - # Gemini 2.5 thinks by default and spends the output budget on reasoning tokens — - # it hit MAX_TOKENS with ~3.8k thoughts and a truncated JSON body (0 claims parsed). - # Extraction is deterministic, no-CoT (mirrors the local path's enable_thinking=False). - thinking_config=types.ThinkingConfig(thinking_budget=0), - ), + cfg = types.GenerateContentConfig( + system_instruction=system or None, + temperature=0, + max_output_tokens=max_tokens, + response_mime_type="application/json", + # Gemini 2.5 thinks by default and spends the output budget on reasoning tokens — + # it hit MAX_TOKENS with ~3.8k thoughts and a truncated JSON body (0 claims parsed). + # Extraction is deterministic, no-CoT (mirrors the local path's enable_thinking=False). + thinking_config=types.ThinkingConfig(thinking_budget=0), ) - return resp.text or "{}" + for attempt in range(self.retries + 1): + try: + resp = self.client.models.generate_content(model=self.model, contents=user, config=cfg) + return resp.text or "{}" + except Exception as e: # noqa: BLE001 — timeout/5xx/429/network: back off and retry + if attempt >= self.retries: + raise + sleep = 2.0 * (2 ** attempt) + log.warning("Gemini call failed (%s); retry %d/%d in %.0fs", e, attempt + 1, self.retries, sleep) + time.sleep(sleep) def from_config(cfg, sc) -> "LocalQwenBackend | GeminiBackend":