为什么大模型API的错误处理总被忽视
大多数人接入大模型API时,脑子里只有一个画面:发请求、拿回复、完事。一旦跑到生产环境,就会发现各种"惊喜"——超时、429限流、502服务端错误、JSON解析失败……这些问题不处理,轻则用户体验崩塌,重则整个业务链条瘫痪。
我见过一个真实案例:某团队用豆包大模型做客服自动回复,上线第一天就因为并发请求触发限流,连续返回429错误,1000多个用户同时看到"系统繁忙"的提示。问题根源?他们连最基本的重试逻辑都没写。
这篇文章不是讲理论,而是我从实际项目中踩坑后总结的一套完整方案,覆盖超时、限流、熔断、降级、日志五个层面,直接能拿来用。
大模型API常见的五种错误类型
经过几十个项目的实战,我把大模型API的错误归纳为五类,每类需要不同的处理策略:
| 错误类型 | HTTP状态码 | 原因 | 建议处理 |
|---|---|---|---|
| 速率限制 | 429 | 请求频率超过配额 | 指数退避重试 |
| 服务端错误 | 500/502/503 | 模型服务异常 | 重试+切换备用模型 |
| 超时 | 无(连接超时) | 网络波动或模型推理慢 | 增加超时时间+重试 |
| 参数错误 | 400 | 请求格式不对 | 记录日志+告警 |
| 认证失败 | 401/403 | API Key过期或权限不足 | 立即告警,不重试 |
关键是:不是所有错误都应该重试。参数错误你重试一万次结果也一样,认证失败说明配置有问题,这时候重试只会浪费资源。
指数退避重试:不要傻等
最核心的重试策略是指数退避(Exponential Backoff)。原理很简单:每次失败后等待时间翻倍,再加上一个随机抖动(jitter),避免多个请求同时重试造成"惊群效应"。
import time
import random
import httpx
def call_llm_with_retry(prompt, max_retries=3, base_delay=1.0):
for attempt in range(max_retries):
try:
response = httpx.post(
"https://ark.cn-beijing.volces.com/api/v3/chat/completions",
json={"model": "doubao-pro-32k", "messages": [{"role": "user", "content": prompt}]},
headers={"Authorization": "Bearer YOUR_API_KEY"},
timeout=30.0
)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 0))
delay = retry_after if retry_after > 0 else base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limited, retrying in {delay:.1f}s (attempt {attempt + 1})")
time.sleep(delay)
continue
response.raise_for_status()
return response.json()
except httpx.TimeoutException:
delay = base_delay * (2 ** attempt) + random.uniform(0, 0.5)
print(f"Timeout, retrying in {delay:.1f}s (attempt {attempt + 1})")
time.sleep(delay)
except httpx.HTTPStatusError as e:
if e.response.status_code >= 500:
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Server error {e.response.status_code}, retrying in {delay:.1f}s")
time.sleep(delay)
else:
raise
raise Exception(f"Failed after {max_retries} retries")
这里有几个实战经验:第一,一定要优先读取 Retry-After 响应头,这是服务端给你的明确等待时间;第二,随机抖动不能省,它能把并发重试分散到不同时间点;第三,最大重试次数建议3次就够了,超过3次说明问题不是暂时性的。
熔断机制:防止雪崩
当某个模型API持续出错时,你的系统不应该傻傻地一直重试。熔断器(Circuit Breaker)模式就是用来解决这个问题的——连续失败达到阈值后直接"断开",不再发请求,转而走降级逻辑。
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.last_failure_time = None
self.state = "closed" # closed=正常, open=熔断, half_open=试探
def can_execute(self):
if self.state == "closed":
return True
if self.state == "open":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "half_open"
return True
return False
return True # half_open允许试探一次
def record_success(self):
self.failure_count = 0
self.state = "closed"
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
print(f"Circuit OPENED after {self.failure_count} failures")
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=60)
def safe_call_llm(prompt):
if not breaker.can_execute():
return fallback_response(prompt) # 降级处理
try:
result = call_llm_with_retry(prompt)
breaker.record_success()
return result
except Exception:
breaker.record_failure()
return fallback_response(prompt)
熔断器的三个状态就像电闸:关闭(正常运行)→ 连续失败达到阈值后打开(拒绝请求)→ 等待恢复超时后进入半开(放一个请求试试)→ 成功则关闭,失败则继续打开。这种机制在微服务架构中非常成熟,同样适用于大模型调用。
多模型降级:永远有Plan B
单模型依赖是最大的单点风险。我的做法是配置一个模型链:主力模型失败时自动切换到备用模型。
MODEL_CHAIN = [
{"name": "doubao-pro-32k", "endpoint": "volces", "key": "KEY_A"},
{"name": "deepseek-chat", "endpoint": "deepseek", "key": "KEY_B"},
{"name": "qwen-turbo", "endpoint": "dashscope", "key": "KEY_C"},
]
def call_with_fallback(prompt):
errors = []
for model in MODEL_CHAIN:
try:
breaker_key = model["name"]
if not breakers[breaker_key].can_execute():
continue
result = call_model(model, prompt)
breakers[breaker_key].record_success()
return result
except Exception as e:
errors.append(f"{model['name']}: {str(e)}")
breakers[breaker_key].record_failure()
# 所有模型都失败,返回缓存或固定回复
return {"content": "抱歉,当前服务繁忙,请稍后重试。"}
这个策略的价值在于:你不需要追求100%的高可用API,而是用成本更低的备用模型兜底。比如主力用豆包Pro,备用切到DeepSeek,最后兜底用通义千问Turbo(便宜且稳定)。对用户来说,回答慢一点可以接受,完全不能服务就不行了。
请求队列与并发控制
大模型API通常有QPS限制。与其被动触发429再重试,不如主动控制并发。Python的 asyncio.Semaphore 是最简单的方案:
import asyncio
semaphore = asyncio.Semaphore(5) # 最多5个并发请求
async def async_call_llm(prompt):
async with semaphore:
return await call_llm_async(prompt)
# 批量处理时不会超过并发限制
tasks = [async_call_llm(p) for p in prompts]
results = await asyncio.gather(*tasks, return_exceptions=True)
如果你用的是Celery做任务队列,那就更简单——直接限制worker的并发数(--concurrency=5),天然就是流量控制。关键是不要让所有请求一股脑涌向API,要有"闸门"。
完整的错误处理架构
把上面的组件组装起来,就是一个生产级的错误处理架构:
- 第一层:请求拦截 — 信号量控制并发,防止超限
- 第二层:超时设置 — 连接超时10s,读取超时60s,不卡死
- 第三层:指数退避重试 — 最多3次,处理429/5xx
- 第四层:熔断器 — 连续失败5次则熔断60s
- 第五层:多模型降级 — 主力失败切备用,全部失败走兜底
- 第六层:日志与告警 — 记录每次失败详情,错误率超阈值发告警
这个架构不是一蹴而就的。我的建议是从第一层开始,逐步往上加。先写好重试逻辑(投入产出比最高),再加班列控制,最后加熔断和降级。
日志与监控:出了问题要能定位
没有日志的错误处理等于没做。建议记录以下关键信息:
import logging
import json
logger = logging.getLogger("llm_api")
handler = logging.FileHandler("llm_api_errors.log", encoding="utf-8")
handler.setFormatter(logging.Formatter('%(asctime)s %(message)s'))
logger.addHandler(handler)
def log_api_call(model, prompt_len, status_code, latency_ms, error=None):
logger.info(json.dumps({
"model": model,
"prompt_len": prompt_len,
"status": status_code,
"latency_ms": latency_ms,
"error": str(error) if error else None,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
}, ensure_ascii=False))
如果错误率在1分钟内超过30%,就触发钉钉/飞书告警。这条经验来自一个教训——我们的模型服务在某天凌晨3点悄悄挂了,但因为没人看日志,直到早上9点客服才接到投诉。加了自动告警之后,现在平均发现问题时间缩短到2分钟以内。
总结
大模型API的错误处理不是一个"锦上添花"的功能,而是上生产的必选项。核心要点:
- 区分错误类型,不是所有错误都值得重试
- 指数退避+随机抖动是最实用的重试策略
- 熔断器防止连续失败拖垮整个系统
- 多模型降级保证服务不中断
- 主动控制并发比被动触发限流更优雅
- 日志+告警是最后一道防线
写代码时花10分钟加上这些处理逻辑,能省掉以后无数个凌晨被叫起来修Bug的痛苦。记住:用户不关心你用的是哪个模型,他们只关心你的服务是不是一直能用。
更多AI开发实战内容,可以参考 豆包AI函数调用实战教程 和 豆包大模型SSE流式对话开发实战。
版权声明
本文仅代表个人观点。
本文系AI辅助作者原创,未经许可,转载请保留原文链接。

发表评论