0

大模型API调用统一封装实战:一个SDK接入所有主流模型

2026.05.21 | youres | 13次围观

为什么要做API统一封装

去年我们团队同时接入DeepSeek、豆包、通义千问三个大模型做能力对比,刚开始直接调各自的SDK,结果代码里到处是if-else判断。需求一变就要改三处代码,某次上线前忘记更新豆包的模型ID,导致线上故障。从那以后我就坚定一个想法:大模型调用必须统一封装

统一封装解决的核心问题有三个:

  • 接口碎片化:每家厂商的参数命名、返回格式都不同,代码里写死调用逻辑就是给自己挖坑
  • 模型切换成本:从GPT切到国产模型,理论上只改模型ID就行,实际要重构大量代码
  • 成本与性能优化:不同模型适合不同任务,但没有统一入口就无法做智能路由

这篇文章分享我实际项目中沉淀的一套统一封装方案,核心思路是抽象层+适配器模式,一套代码对接所有主流模型。

核心架构设计

整个方案分三层:抽象层定义统一接口,适配层对接各家SDK,业务层只需要调抽象接口。下面这张表展示关键设计:

层级 职责 关键类/方法
抽象层 定义统一的调用接口和数据结构 BaseModelAdapter / ChatRequest / ChatResponse
适配层 对接各家模型SDK,实现接口转换 DeepSeekAdapter / DoubaoAdapter / QwenAdapter
业务层 调用统一接口,不关心底层实现 ModelClient.chat() / ModelClient.stream()

这个架构的好处是扩展成本低:新增一个模型,只需要写一个适配器类(约100行代码),业务代码完全不用改。

统一接口定义

抽象层是最关键的设计,我参考了OpenAI的接口风格(毕竟是最成熟的),但做了几个针对性优化:

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Optional, Iterator
from enum import Enum

class ModelType(Enum):
    DEEPSEEK = "deepseek"
    DOUBAO = "doubao" 
    QWEN = "qwen"
    GLM = "glm"

@dataclass
class ChatMessage:
    role: str  # system/user/assistant
    content: str
    images: Optional[List[str]] = None  # 多模态支持

@dataclass  
class ChatRequest:
    messages: List[ChatMessage]
    model: str
    temperature: float = 0.7
    max_tokens: int = 4096
    stream: bool = False
    
@dataclass
class ChatResponse:
    content: str
    model: str
    usage: dict  # ${"prompt_tokens": 100, "completion_tokens": 200}$
    finish_reason: str

class BaseModelAdapter(ABC):
    """所有模型适配器的基类"""
    
    @abstractmethod
    def chat(self, request: ChatRequest) -> ChatResponse:
        """同步调用"""
        pass
    
    @abstractmethod  
    def stream(self, request: ChatRequest) -> Iterator[str]:
        """流式调用"""
        pass
    
    @abstractmethod
    def count_tokens(self, messages: List[ChatMessage]) -> int:
        """计算Token数量"""
        pass

注意几个设计细节:ChatMessage支持多模态(images字段),Token计算抽象到接口层(不同模型的计算方式不同),流式和同步分开定义(方便后续扩展)。

适配器实现示例

以豆包大模型为例,展示适配器如何把厂商特有的API转换成统一接口:

import httpx
from volcengine.maas import MaasService

class DoubaoAdapter(BaseModelAdapter):
    def __init__(self, api_key: str, endpoint: str):
        self.client = MaasService(endpoint, "cn-beijing")
        self.client.set_ak_sk(api_key.split(":")[0], api_key.split(":")[1])
    
    def chat(self, request: ChatRequest) -> ChatResponse:
        # 转换消息格式
        volc_messages = []
        for msg in request.messages:
            volc_msg = {"role": msg.role, "content": msg.content}
            if msg.images:  # 处理多模态
                volc_msg["content"] = [
                    {"type": "text", "text": msg.content},
                    *[{"type": "image_url", "image_url": {"url": img}} 
                      for img in msg.images]
                ]
            volc_messages.append(volc_msg)
        
        # 调用豆包API
        resp = self.client.chat(
            model=request.model,
            messages=volc_messages,
            temperature=request.temperature,
            max_tokens=request.max_tokens
        )
        
        # 转换返回格式
        return ChatResponse(
            content=resp.choices[0].message.content,
            model=request.model,
            usage={
                "prompt_tokens": resp.usage.prompt_tokens,
                "completion_tokens": resp.usage.completion_tokens
            },
            finish_reason=resp.choices[0].finish_reason
        )
    
    def stream(self, request: ChatRequest) -> Iterator[str]:
        volc_messages = self._convert_messages(request.messages)
        for chunk in self.client.stream_chat(
            model=request.model,
            messages=volc_messages,
            temperature=request.temperature
        ):
            if chunk.choices and chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content
    
    def count_tokens(self, messages: List[ChatMessage]) -> int:
        # 豆包使用官方Tokenizer
        total = 0
        for msg in messages:
            total += len(self.client.tokenize(msg.content))
        return total

适配器的核心工作是格式转换:把统一的ChatRequest转换成厂商API需要的格式,再把厂商的返回转换成统一的ChatResponse。这样业务层只需要面向接口编程,完全不用关心底层差异。

统一入口封装

有了适配器后,需要一个统一的入口类来管理所有模型:

class ModelClient:
    """统一模型调用入口"""
    
    _adapters = {}  # 单例缓存
    
    @classmethod
    def get_adapter(cls, model_type: ModelType, config: dict) -> BaseModelAdapter:
        if model_type.value not in cls._adapters:
            if model_type == ModelType.DEEPSEEK:
                cls._adapters[model_type.value] = DeepSeekAdapter(config["api_key"])
            elif model_type == ModelType.DOUBAO:
                cls._adapters[model_type.value] = DoubaoAdapter(
                    config["api_key"], 
                    config.get("endpoint", "https://ark.cn-beijing.volces.com")
                )
            elif model_type == ModelType.QWEN:
                cls._adapters[model_type.value] = QwenAdapter(config["api_key"])
        return cls._adapters[model_type.value]
    
    @classmethod
    def chat(cls, model_type: ModelType, model: str, 
             messages: List[ChatMessage], **kwargs) -> ChatResponse:
        """统一的调用入口"""
        adapter = cls.get_adapter(model_type, kwargs.get("config", {}))
        request = ChatRequest(
            messages=messages,
            model=model,
            temperature=kwargs.get("temperature", 0.7),
            max_tokens=kwargs.get("max_tokens", 4096),
            stream=False
        )
        return adapter.chat(request)
    
    @classmethod
    def stream(cls, model_type: ModelType, model: str,
               messages: List[ChatMessage], **kwargs) -> Iterator[str]:
        """流式调用入口"""
        adapter = cls.get_adapter(model_type, kwargs.get("config", {}))
        request = ChatRequest(messages=messages, model=model, stream=True, **kwargs)
        yield from adapter.stream(request)

业务代码调用起来非常简单:

# 调用豆包
response = ModelClient.chat(
    model_type=ModelType.DOUBAO,
    model="doubao-pro-32k",
    messages=[ChatMessage(role="user", content="解释一下RAG技术")]
)

# 切换到DeepSeek,只改model_type
response = ModelClient.chat(
    model_type=ModelType.DEEPSEEK,
    model="deepseek-chat",
    messages=[ChatMessage(role="user", content="解释一下RAG技术")]
)

实际踩坑记录

开发过程中遇到几个典型问题,分享解决方案:

  • 超时处理不一致:DeepSeek的SDK用requests,豆包用httpx,超时参数名都不同。解决:适配器内部统一用httpx,设置默认30秒超时
  • 流式返回的结束判断:有的模型用finish_reason="stop",有的直接返回空chunk。解决:在适配层统一处理,只要content为空就结束
  • 多模态参数差异大:OpenAI用image_url,豆包用image(base64),通义千问用image(URL)。解决:适配器里根据模型类型做格式转换
  • Token计算不统一:各家Tokenizer实现不同,简单用字符串长度估算会偏差很大。解决:调用各家官方的Tokenizer API(都有提供)

成本与性能优化实践

统一封装后,可以做更多优化:

1. 智能路由:根据任务类型自动选择最优模型。我的实践规则:

def smart_route(task_type: str, content_length: int) -> tuple[ModelType, str]:
    if task_type == "code_generation":
        return ModelType.DEEPSEEK, "deepseek-coder"
    elif task_type == "long_context" and content_length > 10000:
        return ModelType.QWEN, "qwen-max-longcontext" 
    elif task_type == "multimodal":
        return ModelType.DOUBAO, "doubao-vision"
    else:
        return ModelType.DOUBAO, "doubao-pro"  # 默认用性价比高的

2. 并发调用:需要多个模型同时处理同一请求时(比如做结果对比),用asyncio并发:

import asyncio

async def compare_models(messages: List[ChatMessage]):
    tasks = [
        ModelClient.async_chat(ModelType.DEEPSEEK, "deepseek-chat", messages),
        ModelClient.async_chat(ModelType.DOUBAO, "doubao-pro", messages),
        ModelClient.async_chat(ModelType.QWEN, "qwen-turbo", messages)
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return results

3. 缓存策略:对于重复的请求,用Redis缓存响应(适合非实时场景):

def cached_chat(request: ChatRequest, ttl: int = 3600):
    cache_key = hashlib.md5(f"{request.model}{request.messages}".encode()).hexdigest()
    cached = redis.get(cache_key)
    if cached:
        return ChatResponse(**json.loads(cached))
    
    response = adapter.chat(request)
    redis.setex(cache_key, ttl, json.dumps(asdict(response)))
    return response

相关资源与内链

写在最后

大模型API统一封装不是过度设计,而是企业级应用的刚需。我这套方案从去年10月上线至今,已经稳定运行半年多,期间新增了GLM和Kimi的适配器,业务代码一行没改。

如果你正在做多模型接入,建议一开始就做统一封装。前期多花两天写适配器,后续维护成本至少降低50%。这套方案的完整代码我已开源,文末链接可直接下载。

版权声明

本文仅代表个人观点。
本文系AI辅助作者原创,未经许可,转载请保留原文链接。

发表评论