0

大模型API错误处理与重试机制实战:让AI应用稳定运行不中断

2026.05.22 | youres | 37次围观

为什么大模型API的错误处理总被忽视

大多数人接入大模型API时,脑子里只有一个画面:发请求、拿回复、完事。一旦跑到生产环境,就会发现各种"惊喜"——超时、429限流、502服务端错误、JSON解析失败……这些问题不处理,轻则用户体验崩塌,重则整个业务链条瘫痪。

我见过一个真实案例:某团队用豆包大模型做客服自动回复,上线第一天就因为并发请求触发限流,连续返回429错误,1000多个用户同时看到"系统繁忙"的提示。问题根源?他们连最基本的重试逻辑都没写。

这篇文章不是讲理论,而是我从实际项目中踩坑后总结的一套完整方案,覆盖超时、限流、熔断、降级、日志五个层面,直接能拿来用。

大模型API常见的五种错误类型

经过几十个项目的实战,我把大模型API的错误归纳为五类,每类需要不同的处理策略:

错误类型HTTP状态码原因建议处理
速率限制429请求频率超过配额指数退避重试
服务端错误500/502/503模型服务异常重试+切换备用模型
超时无(连接超时)网络波动或模型推理慢增加超时时间+重试
参数错误400请求格式不对记录日志+告警
认证失败401/403API Key过期或权限不足立即告警,不重试

关键是:不是所有错误都应该重试。参数错误你重试一万次结果也一样,认证失败说明配置有问题,这时候重试只会浪费资源。

指数退避重试:不要傻等

最核心的重试策略是指数退避(Exponential Backoff)。原理很简单:每次失败后等待时间翻倍,再加上一个随机抖动(jitter),避免多个请求同时重试造成"惊群效应"。

import time
import random
import httpx

def call_llm_with_retry(prompt, max_retries=3, base_delay=1.0):
    for attempt in range(max_retries):
        try:
            response = httpx.post(
                "https://ark.cn-beijing.volces.com/api/v3/chat/completions",
                json={"model": "doubao-pro-32k", "messages": [{"role": "user", "content": prompt}]},
                headers={"Authorization": "Bearer YOUR_API_KEY"},
                timeout=30.0
            )
            if response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", 0))
                delay = retry_after if retry_after > 0 else base_delay * (2 ** attempt) + random.uniform(0, 1)
                print(f"Rate limited, retrying in {delay:.1f}s (attempt {attempt + 1})")
                time.sleep(delay)
                continue
            response.raise_for_status()
            return response.json()
        except httpx.TimeoutException:
            delay = base_delay * (2 ** attempt) + random.uniform(0, 0.5)
            print(f"Timeout, retrying in {delay:.1f}s (attempt {attempt + 1})")
            time.sleep(delay)
        except httpx.HTTPStatusError as e:
            if e.response.status_code >= 500:
                delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
                print(f"Server error {e.response.status_code}, retrying in {delay:.1f}s")
                time.sleep(delay)
            else:
                raise
    raise Exception(f"Failed after {max_retries} retries")

这里有几个实战经验:第一,一定要优先读取 Retry-After 响应头,这是服务端给你的明确等待时间;第二,随机抖动不能省,它能把并发重试分散到不同时间点;第三,最大重试次数建议3次就够了,超过3次说明问题不是暂时性的。

熔断机制:防止雪崩

当某个模型API持续出错时,你的系统不应该傻傻地一直重试。熔断器(Circuit Breaker)模式就是用来解决这个问题的——连续失败达到阈值后直接"断开",不再发请求,转而走降级逻辑。

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.last_failure_time = None
        self.state = "closed"  # closed=正常, open=熔断, half_open=试探

    def can_execute(self):
        if self.state == "closed":
            return True
        if self.state == "open":
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = "half_open"
                return True
            return False
        return True  # half_open允许试探一次

    def record_success(self):
        self.failure_count = 0
        self.state = "closed"

    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = "open"
            print(f"Circuit OPENED after {self.failure_count} failures")

breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=60)

def safe_call_llm(prompt):
    if not breaker.can_execute():
        return fallback_response(prompt)  # 降级处理
    try:
        result = call_llm_with_retry(prompt)
        breaker.record_success()
        return result
    except Exception:
        breaker.record_failure()
        return fallback_response(prompt)

熔断器的三个状态就像电闸:关闭(正常运行)→ 连续失败达到阈值后打开(拒绝请求)→ 等待恢复超时后进入半开(放一个请求试试)→ 成功则关闭,失败则继续打开。这种机制在微服务架构中非常成熟,同样适用于大模型调用。

多模型降级:永远有Plan B

单模型依赖是最大的单点风险。我的做法是配置一个模型链:主力模型失败时自动切换到备用模型。

MODEL_CHAIN = [
    {"name": "doubao-pro-32k", "endpoint": "volces", "key": "KEY_A"},
    {"name": "deepseek-chat", "endpoint": "deepseek", "key": "KEY_B"},
    {"name": "qwen-turbo", "endpoint": "dashscope", "key": "KEY_C"},
]

def call_with_fallback(prompt):
    errors = []
    for model in MODEL_CHAIN:
        try:
            breaker_key = model["name"]
            if not breakers[breaker_key].can_execute():
                continue
            result = call_model(model, prompt)
            breakers[breaker_key].record_success()
            return result
        except Exception as e:
            errors.append(f"{model['name']}: {str(e)}")
            breakers[breaker_key].record_failure()
    # 所有模型都失败,返回缓存或固定回复
    return {"content": "抱歉,当前服务繁忙,请稍后重试。"}

这个策略的价值在于:你不需要追求100%的高可用API,而是用成本更低的备用模型兜底。比如主力用豆包Pro,备用切到DeepSeek,最后兜底用通义千问Turbo(便宜且稳定)。对用户来说,回答慢一点可以接受,完全不能服务就不行了。

请求队列与并发控制

大模型API通常有QPS限制。与其被动触发429再重试,不如主动控制并发。Python的 asyncio.Semaphore 是最简单的方案:

import asyncio

semaphore = asyncio.Semaphore(5)  # 最多5个并发请求

async def async_call_llm(prompt):
    async with semaphore:
        return await call_llm_async(prompt)

# 批量处理时不会超过并发限制
tasks = [async_call_llm(p) for p in prompts]
results = await asyncio.gather(*tasks, return_exceptions=True)

如果你用的是Celery做任务队列,那就更简单——直接限制worker的并发数(--concurrency=5),天然就是流量控制。关键是不要让所有请求一股脑涌向API,要有"闸门"。

完整的错误处理架构

把上面的组件组装起来,就是一个生产级的错误处理架构:

  • 第一层:请求拦截 — 信号量控制并发,防止超限
  • 第二层:超时设置 — 连接超时10s,读取超时60s,不卡死
  • 第三层:指数退避重试 — 最多3次,处理429/5xx
  • 第四层:熔断器 — 连续失败5次则熔断60s
  • 第五层:多模型降级 — 主力失败切备用,全部失败走兜底
  • 第六层:日志与告警 — 记录每次失败详情,错误率超阈值发告警

这个架构不是一蹴而就的。我的建议是从第一层开始,逐步往上加。先写好重试逻辑(投入产出比最高),再加班列控制,最后加熔断和降级。

日志与监控:出了问题要能定位

没有日志的错误处理等于没做。建议记录以下关键信息:

import logging
import json

logger = logging.getLogger("llm_api")
handler = logging.FileHandler("llm_api_errors.log", encoding="utf-8")
handler.setFormatter(logging.Formatter('%(asctime)s %(message)s'))
logger.addHandler(handler)

def log_api_call(model, prompt_len, status_code, latency_ms, error=None):
    logger.info(json.dumps({
        "model": model,
        "prompt_len": prompt_len,
        "status": status_code,
        "latency_ms": latency_ms,
        "error": str(error) if error else None,
        "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
    }, ensure_ascii=False))

如果错误率在1分钟内超过30%,就触发钉钉/飞书告警。这条经验来自一个教训——我们的模型服务在某天凌晨3点悄悄挂了,但因为没人看日志,直到早上9点客服才接到投诉。加了自动告警之后,现在平均发现问题时间缩短到2分钟以内。

总结

大模型API的错误处理不是一个"锦上添花"的功能,而是上生产的必选项。核心要点:

  • 区分错误类型,不是所有错误都值得重试
  • 指数退避+随机抖动是最实用的重试策略
  • 熔断器防止连续失败拖垮整个系统
  • 多模型降级保证服务不中断
  • 主动控制并发比被动触发限流更优雅
  • 日志+告警是最后一道防线

写代码时花10分钟加上这些处理逻辑,能省掉以后无数个凌晨被叫起来修Bug的痛苦。记住:用户不关心你用的是哪个模型,他们只关心你的服务是不是一直能用。

更多AI开发实战内容,可以参考 豆包AI函数调用实战教程豆包大模型SSE流式对话开发实战

版权声明

本文仅代表个人观点。
本文系AI辅助作者原创,未经许可,转载请保留原文链接。

发表评论
881文章数 0评论数
作者其它文章