← 返回首页

ChatGPT API生产环境实战:从日花$50到$5的优化之路

2023-09-18 | ChatGPT API 成本优化
我们的AI客服助手上线第一周,每天API成本$50+。经过3个月的持续优化,降到了$5/天,响应速度反而更快了。这篇文章记录我踩过的坑和有效的优化手段。

问题背景

2023年3月,我们上线了一个基于ChatGPT的客服助手,处理电商售后咨询。日均请求量约8000次,第一周的API账单让我傻眼了:

老板要求把成本控制在$200/月以内,同时响应时间不能太慢。

成本分析

Token消耗分布

import openai
from collections import defaultdict

# 统计一周的token使用情况
token_stats = defaultdict(lambda: {"input": 0, "output": 0, "count": 0})

# 分析日志发现的问题
"""
问题1: system prompt太长,每次都发2000 token
问题2: 上下文带了完整对话历史,动辄5000+ token  
问题3: 没有限制max_tokens,有时输出3000+ token
问题4: 大量重复请求,同一问题反复调用
"""
消耗项 占比 原因
System Prompt 35% 每次请求重复发送2000 token
对话历史 40% 带完整历史,平均5000 token
输出 25% 未限制,经常废话连篇

优化1:压缩System Prompt

优化前(2000 token)

# 原来的system prompt,又臭又长
SYSTEM_PROMPT_OLD = """
你是XX电商平台的智能客服助手,负责处理用户的售后问题。

你需要遵守以下规则:
1. 始终保持友好、专业的态度
2. 如果用户询问退款问题,需要先询问订单号
3. 如果用户询问物流问题,需要先询问快递单号
4. 对于超出你能力范围的问题,引导用户联系人工客服
5. 不要泄露任何内部信息
6. 使用简洁明了的语言回复用户
... (此处省略1500字)
"""

优化后(400 token)

# 精简版system prompt
SYSTEM_PROMPT = """角色:电商售后客服
规则:
- 先问订单号再处理
- 简洁回复,<100字
- 超出能力范围→转人工
- 禁止:泄露内部信息、承诺无法兑现的事项

常见场景处理:
- 退款:确认订单状态→告知处理时间
- 物流:查询快递单号→告知预计时间
- 投诉:致歉→记录问题→转人工"""
✅ 效果: System prompt从2000 token压缩到400 token,单次请求节省1600 token,日成本降低约$15。

优化2:对话历史管理

from typing import List, Dict

def manage_conversation_history(
    messages: List[Dict],
    max_tokens: int = 2000,
    keep_recent: int = 4
) -> List[Dict]:
    """
    智能管理对话历史
    
    策略:
    1. 保留system prompt
    2. 保留最近N轮对话
    3. 中间对话做摘要或丢弃
    """
    if not messages:
        return messages
    
    # 分离system和对话
    system_msg = [m for m in messages if m['role'] == 'system']
    conv_msgs = [m for m in messages if m['role'] != 'system']
    
    # 只保留最近的对话轮次
    if len(conv_msgs) > keep_recent * 2:  # 每轮包含user和assistant
        # 保留第一轮(建立上下文)和最近几轮
        first_exchange = conv_msgs[:2]
        recent_exchanges = conv_msgs[-(keep_recent * 2):]
        
        # 中间用摘要替代
        summary = {
            "role": "system",
            "content": f"[历史摘要:用户之前询问了{len(conv_msgs)//2}个问题,已处理]"
        }
        conv_msgs = first_exchange + [summary] + recent_exchanges
    
    return system_msg + conv_msgs

# 使用
messages = manage_conversation_history(full_history, keep_recent=3)
# 从平均5000 token降到1500 token
⚠️ 踩坑:摘要不能太简略
最初我直接丢弃中间对话,结果GPT经常忘记之前说过的话,用户体验很差。后来加了简单摘要,告诉模型"之前讨论过什么",效果好很多。

优化3:输出长度控制

import openai

def call_gpt_optimized(
    messages: List[Dict],
    task_type: str = "chat"
) -> str:
    """
    根据任务类型动态设置max_tokens
    """
    # 不同场景的输出限制
    max_tokens_map = {
        "chat": 150,      # 日常对话,简短回复
        "explain": 300,   # 解释说明,中等长度  
        "summary": 100,   # 摘要,精简
        "analyze": 500,   # 分析,允许详细
    }
    
    max_tokens = max_tokens_map.get(task_type, 200)
    
    response = openai.ChatCompletion.create(
        model="gpt-3.5-turbo",
        messages=messages,
        max_tokens=max_tokens,
        temperature=0.7,
        # 提前停止,节省token
        stop=["\n\n", "如果您还有其他问题"]
    )
    
    return response.choices[0].message.content

# 客服场景大多是chat类型,150 token足够

优化4:语义缓存

import hashlib
import numpy as np
from typing import Optional
import redis

class SemanticCache:
    """
    语义缓存:相似问题返回缓存结果
    
    不是简单的字符串匹配,而是基于embedding的相似度
    """
    
    def __init__(self, embedding_model, redis_client, threshold=0.92):
        self.embed_model = embedding_model
        self.redis = redis_client
        self.threshold = threshold
    
    def _get_embedding(self, text: str) -> np.ndarray:
        # 使用本地embedding模型,不用OpenAI的
        return self.embed_model.encode(text)
    
    def get(self, query: str) -> Optional[str]:
        """查找语义相似的缓存"""
        query_emb = self._get_embedding(query)
        
        # 遍历缓存找相似的(生产环境用向量数据库)
        for key in self.redis.scan_iter("cache:*"):
            cached = self.redis.hgetall(key)
            cached_emb = np.frombuffer(cached[b'embedding'])
            
            similarity = np.dot(query_emb, cached_emb)
            if similarity > self.threshold:
                return cached[b'response'].decode()
        
        return None
    
    def set(self, query: str, response: str, ttl: int = 3600):
        """缓存结果"""
        query_emb = self._get_embedding(query)
        key = f"cache:{hashlib.md5(query.encode()).hexdigest()}"
        
        self.redis.hset(key, mapping={
            'query': query,
            'embedding': query_emb.tobytes(),
            'response': response
        })
        self.redis.expire(key, ttl)

# 使用
cache = SemanticCache(local_embedding, redis_client, threshold=0.92)

def answer_with_cache(query: str) -> str:
    # 先查缓存
    cached = cache.get(query)
    if cached:
        return cached
    
    # 缓存未命中,调用API
    response = call_gpt(query)
    cache.set(query, response)
    return response
💡 缓存命中率统计:
上线一周后,缓存命中率稳定在45%左右。很多用户问的问题高度相似:"怎么退款"、"退款多久到账"、"物流怎么这么慢"。

优化5:模型选择策略

def select_model(query: str, context: dict) -> str:
    """
    根据问题复杂度选择模型
    
    简单问题用3.5-turbo,复杂问题才用4
    """
    # 简单问题的特征
    simple_patterns = [
        "退款", "物流", "发票", "修改地址",
        "取消订单", "查询", "多久"
    ]
    
    # 检查是否是简单问题
    is_simple = any(p in query for p in simple_patterns)
    
    # 检查对话轮次
    conv_rounds = context.get("rounds", 0)
    
    if is_simple and conv_rounds < 3:
        return "gpt-3.5-turbo"  # $0.002/1K tokens
    else:
        return "gpt-4-turbo"   # $0.01/1K tokens (仅复杂场景)

# 结果:90%的请求用3.5-turbo,成本大幅降低

优化6:批量处理

import asyncio
from typing import List

async def batch_process(queries: List[str]) -> List[str]:
    """
    批量处理请求,减少API调用次数
    
    适用场景:后台任务,如批量分类、批量摘要
    """
    # 合并多个小请求为一个大请求
    combined_prompt = "请依次回答以下问题,用---分隔:\n"
    for i, q in enumerate(queries, 1):
        combined_prompt += f"{i}. {q}\n"
    
    response = await openai.ChatCompletion.acreate(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": combined_prompt}],
        max_tokens=len(queries) * 100
    )
    
    # 解析结果
    answers = response.choices[0].message.content.split("---")
    return [a.strip() for a in answers]

# 10个请求合并为1个,节省90%的固定开销

优化7:流式输出

async def stream_response(messages: List[Dict]):
    """
    流式输出,改善用户体验
    
    好处:
    1. 首字符响应更快(从3s到0.5s)
    2. 可以提前检测异常输出并中断
    """
    response = await openai.ChatCompletion.acreate(
        model="gpt-3.5-turbo",
        messages=messages,
        stream=True
    )
    
    full_response = ""
    async for chunk in response:
        if chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            full_response += content
            
            # 检测异常输出
            if "内部信息" in full_response or len(full_response) > 500:
                # 中断并返回兜底回复
                return "抱歉,请联系人工客服处理。"
            
            yield content
    
    return full_response

优化8:完整的重试机制

import time
import random
from functools import wraps

class RateLimitError(Exception):
    pass

def retry_with_exponential_backoff(
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0
):
    """指数退避重试装饰器"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            retries = 0
            while retries < max_retries:
                try:
                    return await func(*args, **kwargs)
                except openai.error.RateLimitError:
                    retries += 1
                    if retries == max_retries:
                        raise
                    
                    delay = min(base_delay * (2 ** retries) + random.uniform(0, 1), max_delay)
                    print(f"Rate limited, retrying in {delay:.1f}s...")
                    await asyncio.sleep(delay)
                    
                except openai.error.APIError as e:
                    if "overloaded" in str(e).lower():
                        retries += 1
                        await asyncio.sleep(5)
                    else:
                        raise
        return wrapper
    return decorator

@retry_with_exponential_backoff(max_retries=3)
async def call_api_safe(messages):
    return await openai.ChatCompletion.acreate(
        model="gpt-3.5-turbo",
        messages=messages,
        timeout=30
    )

最终效果

指标 优化前 优化后 改善
日均成本 $52 $4.8 -91%
月成本 $1500+ ~$150 达标
平均响应时间 3.2s 1.1s -66%
超时率 8% 0.5% -94%
缓存命中率 0 45% 新增
✅ 核心优化排序(按效果):
  1. 语义缓存 - 减少45%调用
  2. 对话历史管理 - 减少60% token
  3. 压缩System Prompt - 减少35% token
  4. 模型选择策略 - 90%用便宜模型
  5. 输出长度控制 - 减少50%输出token

监控与告警

# 成本监控脚本
def daily_cost_alert():
    """每日成本告警"""
    today_cost = get_openai_usage_today()
    
    if today_cost > 10:  # 超过$10告警
        send_alert(f"OpenAI日成本告警: ${today_cost:.2f}")
    
    # 记录到监控系统
    metrics.gauge("openai.daily_cost", today_cost)
    metrics.counter("openai.requests", get_request_count())

# Grafana面板监控
# - 实时token消耗
# - 每小时成本趋势
# - 缓存命中率
# - 各模型调用占比

经验总结

更新记录:
2023-09-18: 初版发布
2023-11-20: 更新GPT-4-turbo相关内容
2024-03-15: 补充语义缓存实现细节