Higress 深度拆解:阿里开源的 AI Native 网关如何用 Wasm 插件重新定义流量治理
一、为什么我们需要一个 AI 时代的网关?
2026 年,当你的系统需要同时对接 OpenAI GPT-5.4、Anthropic Claude 4、通义千问、DeepSeek、Google Gemini 等 10+ 大模型 API 时,传统的 Nginx 或 Kong 已经力不从心了。不是它们不好——而是它们根本不是为 AI 流量设计的。
传统 API 网关处理的是 HTTP 请求转发、限流、鉴权这些「规矩」流量。但 AI 流量有它独特的脾气:
- Token 计费:大模型按 token 收费,传统网关按请求计费,粒度完全不同
- 流式响应:SSE(Server-Sent Events)长连接,一个请求可能持续几十秒甚至几分钟
- 多模型路由:同一个业务请求,可能需要根据成本、延迟、质量在不同模型之间动态切换
- Prompt 注入防护:传统 WAF 规则对自然语言攻击基本无效
- 语义缓存:相同意图的不同表达应该命中缓存,传统网关只做 URL/Body 级别缓存
这就是 Higress 诞生的背景。阿里内部先用了——通义千问 APP、百炼大模型 API 平台、机器学习 PAI 平台的流量全走它——然后才开源。这种「先内战后外战」的路径,让它的设计不是想象出来的,而是真刀真枪打出来的。
二、架构全景:三层分离 + Wasm 扩展
2.1 整体架构
Higress 的架构可以概括为三层:
┌─────────────────────────────────────────────┐
│ 控制面 (Control Plane) │
│ ┌──────────┐ ┌──────────┐ ┌───────────┐ │
│ │ Higress │ │ Istio │ │ Config │ │
│ │ Console │ │ Pilot │ │ Store │ │
│ └──────────┘ └──────────┘ └───────────┘ │
└────────────────────┬────────────────────────┘
│ xDS 协议下发配置
▼
┌─────────────────────────────────────────────┐
│ 数据面 (Data Plane) │
│ ┌──────────────────────────────────────┐ │
│ │ Envoy Proxy │ │
│ │ ┌─────────┐ ┌─────────┐ │ │
│ │ │Listener │ │ Route │ │ │
│ │ │ Filters │ │ Config │ │ │
│ │ └─────────┘ └─────────┘ │ │
│ │ ┌─────────────────────────────┐ │ │
│ │ │ Wasm Plugin Runtime │ │ │
│ │ │ ┌──────┐ ┌──────┐ ┌─────┐ │ │ │
│ │ │ │AI LB │ │Token │ │MCP │ │ │ │
│ │ │ │Plugin│ │Rate │ │Proxy│ │ │ │
│ │ │ └──────┘ └──────┘ └─────┘ │ │ │
│ │ └─────────────────────────────┘ │ │
│ └──────────────────────────────────────┘ │
└─────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ 后端服务 (Backends) │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │ GPT │ │Claude│ │通义 │ │vLLM │ │
│ │ 5.4 │ │ 4 │ │千问 │ │推理 │ │
│ └──────┘ └──────┘ └──────┘ └──────┘ │
└─────────────────────────────────────────────┘
2.2 为什么选择 Envoy + Istio?
这不是一个随意的技术选型。Higress 团队选择 Envoy 作为数据面,有三层考量:
第一层:成熟度。 Envoy 在 Lyft 诞生,经过 Google、Apple、Microsoft 等大规模生产验证,其连接池管理、HTTP/2 支持、超时重试机制已经非常成熟。AI 网关最怕的不是功能不够,而是底层网络库的 bug。
第二层:可观测性。 Envoy 原生输出 stats、access log、tracing 三类数据。AI 网关需要精确的 token 计量、延迟分布、错误率统计,这些都是 Envoy 已经做好的基础设施。
第三层:Wasm 支持。 这是关键。Envoy 是第一个原生支持 Wasm 插件的代理,而 Wasm 插件机制是 Higress 实现 AI 特有能力的技术基石。
2.3 xDS 协议:控制面和数据面的纽带
控制面通过 xDS 协议(LDS、RDS、CDS、EDS 等)向 Envoy 下发配置。这个设计意味着:
- 配置变更无需重启数据面
- 多个数据面实例共享同一份配置
- 增量推送,配置同步延迟在毫秒级
// xDS 配置推送的核心逻辑(简化版)
// 当路由规则变更时,控制面如何通知数据面
func (s *Server) pushRouteConfig(newRoute *RouteConfiguration) {
// 计算配置版本号
version := fmt.Sprintf("v%d", time.Now().UnixNano())
// 构造增量推送请求
delta := &discovery.DeltaDiscoveryRequest{
TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration",
ResourceNames: []string{newRoute.Name},
// 只推送变更的部分
}
// 向所有订阅的 Envoy 实例推送
for _, stream := range s.streams {
stream.Send(delta)
}
}
三、AI 网关的核心能力拆解
3.1 多模型统一代理
这是 Higress 最吸引人的能力。一个 API 接口,后端挂多个模型,网关负责路由。
# Higress 路由配置:多模型代理
# 将不同路径的请求路由到不同的大模型
apiVersion: networking.higress.io/v1
kind: Ingress
metadata:
name: ai-model-proxy
annotations:
# 启用 AI 代理插件
higress.io/enable-ai-proxy: "true"
spec:
rules:
- http:
paths:
# 路径匹配 -> 模型路由
- path: /v1/chat/gpt
backend:
service:
name: openai-gpt54
port:
number: 443
# 插件配置
higress.io/ai-proxy: |
provider: openai
model: gpt-5.4-turbo
timeout: 120s
- path: /v1/chat/claude
backend:
service:
name: anthropic-claude4
port:
number: 443
higress.io/ai-proxy: |
provider: anthropic
model: claude-4-opus
timeout: 180s
- path: /v1/chat/qwen
backend:
service:
name: tongyi-qwen
port:
number: 443
higress.io/ai-proxy: |
provider: qwen
model: qwen-max
timeout: 120s
但真正的威力在于统一接口。不管后端是 OpenAI 的 /v1/chat/completions 还是 Anthropic 的 /v1/messages,Higress 把它们都统一成 OpenAI 兼容格式:
// AI 代理插件的核心转换逻辑
// 将不同厂商的请求格式统一
func (p *AIProxyPlugin) onHttpRequest(ctx wrapper.HttpContext, config AIProxyConfig) types.Action {
body := ctx.GetRequestBody()
// 解析统一格式的请求
var req UnifiedChatRequest
json.Unmarshal(body, &req)
// 根据目标厂商转换请求格式
switch config.Provider {
case "openai":
// OpenAI 格式,直接透传
ctx.SetRequestBody(body)
case "anthropic":
// 转换为 Anthropic 格式
anthropicReq := convertToAnthropic(&req)
newBody, _ := json.Marshal(anthropicReq)
ctx.SetRequestBody(newBody)
// 设置 Anthropic 特有的 header
ctx.SetRequestHeader("anthropic-version", "2023-06-01")
case "qwen":
// 转换为通义千问格式
qwenReq := convertToQwen(&req)
newBody, _ := json.Marshal(qwenReq)
ctx.SetRequestBody(newBody)
}
return types.ActionContinue
}
// OpenAI 格式转 Anthropic 格式
func convertToAnthropic(req *UnifiedChatRequest) *AnthropicRequest {
messages := make([]AnthropicMessage, 0, len(req.Messages))
for _, msg := range req.Messages {
messages = append(messages, AnthropicMessage{
Role: msg.Role,
Content: msg.Content,
})
}
return &AnthropicRequest{
Model: req.Model,
MaxTokens: req.MaxTokens,
Messages: messages,
System: extractSystemPrompt(req.Messages),
}
}
3.2 智能负载均衡:不只是轮询
传统的轮询和加权轮询在 AI 场景下远远不够。Higress 的 AI 负载均衡考虑了:
- 模型能力差异:GPT-5.4 适合推理,Claude 4 适合写作,通义千问性价比高
- 实时延迟:不同模型的响应时间差异巨大(500ms vs 5000ms)
- Token 消耗:不同模型的 token 单价不同
- GPU 利用率:自部署模型的 GPU 负载
// AI 负载均衡插件:加权最短队列优先
type AILoadBalancer struct {
backends []*AIBackend
mu sync.RWMutex
}
type AIBackend struct {
Name string
Model string
Weight int
// 实时指标
ActiveConns atomic.Int64
AvgLatency atomic.Int64 // 毫秒
TokenCost atomic.Int64 // 每千 token 价格(厘)
GPULoad atomic.Int64 // 0-100
ErrorRate atomic.Int64 // 0-10000 (×100)
}
func (lb *AILoadBalancer) Select(req *ChatRequest) *AIBackend {
lb.mu.RLock()
defer lb.mu.RUnlock()
var best *AIBackend
bestScore := float64(-1)
for _, b := range lb.backends {
// 检查模型能力是否匹配请求
if !b.supportsCapability(req.RequiredCapability) {
continue
}
// 综合评分:延迟权重 40%,成本权重 30%,负载权重 30%
latencyScore := 1.0 / (1.0 + float64(b.AvgLatency.Load())/1000.0)
costScore := 1.0 / (1.0 + float64(b.TokenCost.Load())/100.0)
loadScore := 1.0 - float64(b.GPULoad.Load()+b.ActiveConns.Load()*5)/100.0
// 错误率惩罚
errorPenalty := 1.0 - float64(b.ErrorRate.Load())/10000.0
score := (0.4*latencyScore + 0.3*costScore + 0.3*loadScore) * errorPenalty
if score > bestScore {
bestScore = score
best = b
}
}
return best
}
// 后端指标上报:从响应中提取并更新实时指标
func (lb *AILoadBalancer) ReportMetrics(backend *AIBackend, resp *ChatResponse, latencyMs int64) {
// 用指数移动平均更新延迟
oldLatency := backend.AvgLatency.Load()
newLatency := int64(float64(oldLatency)*0.7 + float64(latencyMs)*0.3)
backend.AvgLatency.Store(newLatency)
// 从响应中提取 token 消耗
if resp.Usage != nil {
totalTokens := resp.Usage.TotalTokens
_ = totalTokens // 用于计费和限流
}
}
3.3 Token 级限流:精确到 token 的流量控制
传统网关的限流是 QPS 维度的,对 AI 场景不适用。一个 100 token 的请求和一个 10000 token 的请求占用的资源天差地别。Higress 实现了 TPM(Tokens Per Minute)级别的限流:
// Token 限流插件
type TokenRateLimiter struct {
// 令牌桶:key 为用户/API Key,value 为当前可用 token 数
buckets sync.Map
mu sync.Mutex
}
type TokenBucket struct {
Remaining atomic.Int64
LastRefill atomic.Int64 // Unix timestamp in seconds
Rate int64 // tokens per second
Burst int64 // max tokens
}
func (rl *TokenRateLimiter) onHttpRequest(ctx wrapper.HttpContext, config TokenRateLimitConfig) types.Action {
// 从请求中提取 API Key 和预估 token 数
apiKey := ctx.GetRequestHeader("Authorization")
// 解析请求中的 max_tokens 参数
estimatedTokens := parseMaxTokens(ctx.GetRequestBody())
bucket := rl.getOrCreateBucket(apiKey, config)
// 先补充令牌(按时间窗口)
rl.refillBucket(bucket)
// 尝试消耗令牌
if bucket.Remaining.Load() < estimatedTokens {
// 限流!返回 429
ctx.SetResponseCode(429)
ctx.SetResponseBody(fmt.Sprintf(
`{"error":{"message":"Token rate limit exceeded. Remaining: %d, Required: %d"}}`,
bucket.Remaining.Load(), estimatedTokens,
))
return types.ActionContinue
}
// 预扣令牌(实际消耗在响应后确认)
bucket.Remaining.Sub(estimatedTokens)
// 注册响应回调,根据实际消耗调整
ctx.RegisterResponseEvent(func() {
actualTokens := parseActualTokenUsage(ctx.GetResponseBody())
// 退回多扣的部分
refund := estimatedTokens - actualTokens
if refund > 0 {
bucket.Remaining.Add(refund)
}
})
return types.ActionContinue
}
func (rl *TokenRateLimiter) refillBucket(bucket *TokenBucket) {
now := time.Now().Unix()
last := bucket.LastRefill.Load()
elapsed := now - last
if elapsed > 0 {
refill := elapsed * bucket.Rate
newRemaining := min(bucket.Remaining.Load()+refill, bucket.Burst)
bucket.Remaining.Store(newRemaining)
bucket.LastRefill.Store(now)
}
}
3.4 AI 语义缓存:相同意图命中缓存
这是 Higress 最有意思的能力之一。传统缓存以 URL + Body 为 key,但 AI 请求中,以下两个请求应该命中同一缓存:
- "用 Python 写一个快速排序"
- "请帮我实现一个 Python 版本的 quicksort 算法"
Higress 的语义缓存通过 Embedding 模型计算请求的语义向量,然后在向量空间中查找相似请求:
// AI 语义缓存插件
type SemanticCache struct {
vectorStore VectorStore
embedder Embedder
threshold float64 // 相似度阈值
}
type CacheEntry struct {
Query string
QueryVector []float64
Response string
Model string
TokenCount int
CreatedAt time.Time
TTL time.Duration
}
func (sc *SemanticCache) onHttpRequest(ctx wrapper.HttpContext, config CacheConfig) types.Action {
query := extractUserQuery(ctx.GetRequestBody())
model := extractModel(ctx.GetRequestBody())
// 计算请求的语义向量
queryVector, err := sc.embedder.Embed(query)
if err != nil {
// Embedding 失败,降级为普通请求
return types.ActionContinue
}
// 在向量存储中搜索相似请求
results := sc.vectorStore.Search(queryVector, 5) // 取 top 5
for _, result := range results {
// 检查模型是否匹配
if result.Entry.Model != model {
continue
}
// 检查是否过期
if time.Since(result.Entry.CreatedAt) > result.Entry.TTL {
continue
}
// 检查相似度是否超过阈值
if result.Similarity >= sc.threshold {
// 缓存命中!直接返回缓存的响应
cachedResponse := constructCacheResponse(result.Entry, result.Similarity)
ctx.SetResponseCode(200)
ctx.SetResponseBody(cachedResponse)
// 添加缓存命中标记
ctx.SetResponseHeader("X-AI-Cache", "HIT")
ctx.SetResponseHeader("X-AI-Cache-Similarity",
fmt.Sprintf("%.4f", result.Similarity))
return types.ActionContinue
}
}
// 缓存未命中,注册响应回调存储结果
ctx.RegisterResponseEvent(func() {
response := ctx.GetResponseBody()
entry := &CacheEntry{
Query: query,
QueryVector: queryVector,
Response: response,
Model: model,
CreatedAt: time.Now(),
TTL: config.TTL,
}
sc.vectorStore.Insert(entry)
})
ctx.SetResponseHeader("X-AI-Cache", "MISS")
return types.ActionContinue
}
实际效果如何?在阿里内部的测试中,对于企业级 AI 应用(客服、知识库问答等场景),语义缓存的命中率可以达到 30%-45%,这意味着几乎一半的请求不需要真正调用大模型,直接返回缓存结果。
3.5 AI 可观测性:全链路 Token 追踪
传统可观测性关注的是延迟、错误率、吞吐量。AI 可观测性需要额外关注:
- Token 消耗:每个请求消耗了多少 prompt/completion token
- 首 Token 延迟(TTFT):从请求发出到第一个 token 返回的时间
- Token 生成速率:每秒生成多少 token
- 模型路由决策:请求为什么被路由到这个模型
- 缓存命中率:语义缓存是否命中
// AI 可观测性插件:提取并上报 AI 指标
type AIObserver struct {
metricsExporter *MetricsExporter
traceExporter *TraceExporter
}
type AIMetrics struct {
// 基础 HTTP 指标
StatusCode int
LatencyMs int64
// AI 特有指标
Model string
PromptTokens int64
CompTokens int64
TotalTokens int64
TTFT int64 // Time To First Token (ms)
TokenPerSec float64
// 路由信息
RoutedTo string
CacheHit bool
CacheSimilar float64
}
func (obs *AIObserver) onHttpResponse(ctx wrapper.HttpContext, config ObserverConfig) types.Action {
body := ctx.GetResponseBody()
metrics := &AIMetrics{
StatusCode: ctx.GetStatusCode(),
LatencyMs: ctx.GetDuration(),
}
// 从响应中提取 AI 指标
var resp AIResponse
json.Unmarshal(body, &resp)
if resp.Usage != nil {
metrics.PromptTokens = resp.Usage.PromptTokens
metrics.CompTokens = resp.Usage.CompletionTokens
metrics.TotalTokens = resp.Usage.TotalTokens
}
metrics.Model = resp.Model
metrics.CacheHit = ctx.GetResponseHeader("X-AI-Cache") == "HIT"
// 计算 TTFT 和 Token 生成速率
if ttft := ctx.GetResponseHeader("X-TTFT-Ms"); ttft != "" {
metrics.TTFT, _ = strconv.ParseInt(ttft, 10, 64)
}
if metrics.CompTokens > 0 && metrics.LatencyMs > metrics.TTFT {
genTime := float64(metrics.LatencyMs-metrics.TTFT) / 1000.0
metrics.TokenPerSec = float64(metrics.CompTokens) / genTime
}
// 上报指标
obs.metricsExporter.Export(metrics)
// 如果配置了链路追踪,创建 Span
if config.EnableTracing {
obs.traceExporter.ExportAISpan(ctx, metrics)
}
return types.ActionContinue
}
四、Wasm 插件机制:Higress 的灵魂
4.1 为什么是 Wasm?
在 Higress 之前,API 网关的扩展方式主要有三种:
| 扩展方式 | 语言限制 | 性能 | 安全性 | 动态加载 |
|---|---|---|---|---|
| Nginx Lua | Lua | 高 | 低(共享进程空间) | 否 |
| Kong Go | Go | 高 | 低 | 否 |
| Envoy C++ Filter | C++ | 最高 | 低 | 否 |
| Envoy Wasm | Go/Rust/C++/AssemblyScript | 中高 | 高(沙箱隔离) | 是 |
Wasm 的优势在「动态加载」和「安全性」上。你可以在不重启 Envoy 的情况下热加载一个新的 Wasm 插件,而且插件运行在沙箱中,一个插件的 bug 不会拖垮整个网关。
4.2 用 Go 开发 Higress Wasm 插件
Higress 提供了完整的 Go SDK,让开发者可以用最熟悉的语言写插件。下面我们实现一个完整的AI 请求日志插件,记录每个 AI 请求的详细信息:
package main
import (
"encoding/json"
"fmt"
"strings"
"github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper"
"github.com/tetratelabs/proxy-wasm-go-sdk/proxywasm"
"github.com/tetratelabs/proxy-wasm-go-sdk/proxywasm/types"
)
func main() {
wrapper.SetCtx(
"ai-request-logger",
wrapper.ParseOverrideConfigBy(parseConfig),
wrapper.ProcessRequestHeadersBy(onHttpRequestHeaders),
wrapper.ProcessRequestBodyBy(onHttpRequestBody),
wrapper.ProcessResponseHeadersBy(onHttpResponseHeaders),
wrapper.ProcessResponseBodyBy(onHttpResponseBody),
)
}
// 插件配置
type AIRequestLoggerConfig struct {
LogEndpoint string `json:"log_endpoint"` // 日志上报地址
LogLevel string `json:"log_level"` // 日志级别:minimal/normal/detailed
LogBody bool `json:"log_body"` // 是否记录请求/响应 body
MaxBodySize int `json:"max_body_size"` // body 最大记录大小
}
func parseConfig(jsonData []byte, config *AIRequestLoggerConfig) error {
return json.Unmarshal(jsonData, config)
}
type AIRequestContext struct {
RequestID string
Method string
Path string
Model string
PromptTokens int64
CompTokens int64
TotalTokens int64
StatusCode int
LatencyMs int64
TTFTMs int64
CacheHit bool
RequestBody string
ResponseBody string
StartTime int64
}
func onHttpRequestHeaders(ctx wrapper.HttpContext, config AIRequestLoggerConfig) types.Action {
// 记录请求开始时间
aiCtx := &AIRequestContext{
StartTime: proxywasm.GetCurrentTime().UnixMilli(),
}
// 提取请求元信息
if method, err := proxywasm.GetHttpRequestHeader(":method"); err == nil {
aiCtx.Method = method
}
if path, err := proxywasm.GetHttpRequestHeader(":path"); err == nil {
aiCtx.Path = path
}
if reqID, err := proxywasm.GetHttpRequestHeader("x-request-id"); err == nil {
aiCtx.RequestID = reqID
}
// 检查缓存命中
if cacheStatus, err := proxywasm.GetHttpRequestHeader("x-ai-cache"); err == nil {
aiCtx.CacheHit = cacheStatus == "HIT"
}
ctx.SetContext("ai_request_logger", aiCtx)
// 如果需要记录 body,需要拦截请求体
if config.LogBody {
return types.ActionContinue
}
return types.ActionContinue
}
func onHttpRequestBody(ctx wrapper.HttpContext, config AIRequestLoggerConfig, body []byte) types.Action {
aiCtx, ok := ctx.GetContext("ai_request_logger").(*AIRequestContext)
if !ok {
return types.ActionContinue
}
// 解析请求体,提取模型名
var req struct {
Model string `json:"model"`
}
if err := json.Unmarshal(body, &req); err == nil {
aiCtx.Model = req.Model
}
// 记录请求体(截断)
if config.LogBody && config.MaxBodySize > 0 {
if len(body) > config.MaxBodySize {
aiCtx.RequestBody = string(body[:config.MaxBodySize]) + "...(truncated)"
} else {
aiCtx.RequestBody = string(body)
}
}
return types.ActionContinue
}
func onHttpResponseHeaders(ctx wrapper.HttpContext, config AIRequestLoggerConfig) types.Action {
aiCtx, ok := ctx.GetContext("ai_request_logger").(*AIRequestContext)
if !ok {
return types.ActionContinue
}
// 提取状态码
if status, err := proxywasm.GetHttpResponseHeader(":status"); err == nil {
fmt.Sscanf(status, "%d", &aiCtx.StatusCode)
}
return types.ActionContinue
}
func onHttpResponseBody(ctx wrapper.HttpContext, config AIRequestLoggerConfig, body []byte) types.Action {
aiCtx, ok := ctx.GetContext("ai_request_logger").(*AIRequestContext)
if !ok {
return types.ActionContinue
}
// 计算延迟
aiCtx.LatencyMs = proxywasm.GetCurrentTime().UnixMilli() - aiCtx.StartTime
// 解析响应体,提取 token 使用量
var resp struct {
Model string `json:"model"`
Usage *struct {
PromptTokens int64 `json:"prompt_tokens"`
CompletionTokens int64 `json:"completion_tokens"`
TotalTokens int64 `json:"total_tokens"`
} `json:"usage"`
}
if err := json.Unmarshal(body, &resp); err == nil {
if resp.Usage != nil {
aiCtx.PromptTokens = resp.Usage.PromptTokens
aiCtx.CompTokens = resp.Usage.CompletionTokens
aiCtx.TotalTokens = resp.Usage.TotalTokens
}
if aiCtx.Model == "" {
aiCtx.Model = resp.Model
}
}
// 记录响应体(截断)
if config.LogBody && config.MaxBodySize > 0 {
if len(body) > config.MaxBodySize {
aiCtx.ResponseBody = string(body[:config.MaxBodySize]) + "...(truncated)"
} else {
aiCtx.ResponseBody = string(body)
}
}
// 上报日志
go reportLog(config.LogEndpoint, aiCtx)
return types.ActionContinue
}
func reportLog(endpoint string, aiCtx *AIRequestContext) {
logEntry, _ := json.Marshal(map[string]interface{}{
"request_id": aiCtx.RequestID,
"method": aiCtx.Method,
"path": aiCtx.Path,
"model": aiCtx.Model,
"prompt_tokens": aiCtx.PromptTokens,
"comp_tokens": aiCtx.CompTokens,
"total_tokens": aiCtx.TotalTokens,
"status_code": aiCtx.StatusCode,
"latency_ms": aiCtx.LatencyMs,
"cache_hit": aiCtx.CacheHit,
"timestamp": aiCtx.StartTime,
})
// 通过 HTTP 上报日志
proxywasm.DispatchHttpCall(
"log-service",
[][2]string{
{"Content-Type", "application/json"},
},
logEntry,
nil,
5000,
func(numHeaders int, bodySize int, _ int) {
// 上报完成,无需处理响应
},
)
}
4.3 用 Rust 开发高性能插件
Go 插件开发方便,但在极致性能场景下,Rust 插件更合适。一个 Rust Wasm 插件的编译体积更小(通常 1-3MB vs Go 的 10-20MB),启动更快,内存占用更低:
use proxy_wasm::traits::*;
use proxy_wasm::types::*;
use serde::Deserialize;
#[derive(Deserialize)]
struct Config {
max_prompt_length: usize,
block_keywords: Vec<String>,
}
struct PromptGuardFilter {
config: Config,
}
impl Context for PromptGuardFilter {}
impl HttpContext for PromptGuardFilter {
fn on_http_request_body(&mut self, body_size: usize, _end_of_stream: bool) -> Action {
if body_size == 0 {
return Action::Continue;
}
// 获取请求体
let body = match self.get_http_request_body(0, body_size) {
Some(b) => b,
None => return Action::Continue,
};
// 解析请求
let parsed: serde_json::Value = match serde_json::from_slice(&body) {
Ok(v) => v,
Err(_) => return Action::Continue,
};
// 检查 prompt 长度
if let Some(messages) = parsed["messages"].as_array() {
for msg in messages {
if let Some(content) = msg["content"].as_str() {
// 长度检查
if content.len() > self.config.max_prompt_length {
self.send_http_response(
400,
vec![("Content-Type", "application/json")],
Some(br#"{"error":"Prompt exceeds maximum length"}"#),
);
return Action::Pause;
}
// 关键词检查
for keyword in &self.config.block_keywords {
if content.to_lowercase().contains(&keyword.to_lowercase()) {
self.send_http_response(
400,
vec![("Content-Type", "application/json")],
Some(format!(r#"{{"error":"Blocked keyword: {}"}}"#, keyword).as_bytes()),
);
return Action::Pause;
}
}
}
}
}
Action::Continue
}
}
impl RootContext for PromptGuardRoot {
fn create_http_context(&self, context_id: u32) -> Option<Box<dyn HttpContext>> {
Some(Box::new(PromptGuardFilter {
config: self.config.clone(),
}))
}
}
4.4 插件热加载流程
Higress 的插件热加载是它相比传统网关的一大优势。整个流程如下:
1. 开发者推送 Wasm 插件到镜像仓库
docker push registry.cn-hangzhou.aliyuncs.com/my-plugins/ai-guard:1.0.0
2. 控制台配置插件,关联到路由规则
POST /v1/plugins
{
"name": "ai-guard",
"image": "registry.cn-hangzhou.aliyuncs.com/my-plugins/ai-guard:1.0.0",
"config": { "max_prompt_length": 10000 }
}
3. 控制面通过 xDS 下发 WasmPlugin 配置
Envoy 收到配置后:
a. 从镜像仓库拉取 Wasm 文件
b. 在沙箱中初始化 Wasm VM
c. 将新请求路由到新插件
d. 旧插件的在途请求处理完后被回收
4. 全程零中断
五、OpenAPI-to-MCP:让 AI Agent 调用你的 API
这是 Higress 在 2026 年最亮眼的更新。MCP(Model Context Protocol)是 Anthropic 提出的标准,让 AI Agent 能调用外部工具。但大部分企业的 API 都是 OpenAPI(Swagger)格式,手动写 MCP 描述费时费力。
Higress 的 openapi-to-mcp 工具可以自动把 OpenAPI 描述转换为 MCP 格式:
# 一键转换
openapi-to-mcp --input stock-api.yaml --output stock-mcp.yaml
# 生成的 MCP 描述可以直接在 Higress 控制台托管
# AI Agent 即可通过 MCP 协议调用你的 API
让我们看一个完整的示例。假设你有一个股票查询 API:
# stock-api.yaml - OpenAPI 描述
openapi: "3.0.0"
info:
title: Stock Query API
version: "1.0"
paths:
/api/stock/query:
post:
summary: 查询股票实时行情
operationId: stock_query
requestBody:
required: true
content:
application/json:
schema:
type: object
properties:
symbol:
type: string
description: "股票代码,如 00700.HK"
market:
type: string
enum: [HK, US, CN]
description: "市场"
responses:
"200":
description: 股票行情数据
content:
application/json:
schema:
type: object
properties:
price:
type: number
change:
type: number
volume:
type: integer
转换后生成的 MCP 描述:
# stock-mcp.yaml - 自动生成的 MCP 描述
tools:
- name: stock_query
description: "查询股票实时行情。输入股票代码和市场,返回当前价格、涨跌幅和成交量。"
inputSchema:
type: object
properties:
symbol:
type: string
description: "股票代码,如 00700.HK"
market:
type: string
enum: [HK, US, CN]
description: "市场"
required: [symbol]
# Higress 自动添加路由信息
backend:
type: http
url: "http://stock-service/api/stock/query"
method: POST
headers:
Content-Type: application/json
然后在 Higress 控制台上传这个 MCP 描述,AI Agent 就可以直接调用:
# AI Agent 代码示例
from anthropic import Anthropic
client = Anthropic()
response = client.messages.create(
model="claude-4-opus",
messages=[{
"role": "user",
"content": "帮我查一下腾讯的股价"
}],
tools=[{
"type": "mcp",
"server_url": "http://higress-gateway:8080/mcp/stock",
}]
)
# Claude 会自动调用 stock_query 工具
# tool_call("stock_query", symbol="00700.HK", market="HK")
# 返回:price=425.6, change=+2.3%, volume=12345678
整个流程的核心价值:你不需要改一行业务代码,只需要一个 YAML 文件,AI Agent 就能调用你的 API。
六、实战:从零搭建 AI 网关
6.1 本地开发环境搭建
Higress 只需要 Docker 就能启动,对个人开发者非常友好:
# 创建工作目录
mkdir higress && cd higress
# 启动 Higress all-in-one
docker run -d --rm --name higress-ai \
-v ${PWD}:/data \
-p 8001:8001 \ # 控制台
-p 8080:8080 \ # HTTP 网关
-p 8443:8443 \ # HTTPS 网关
higress-registry.cn-hangzhou.cr.aliyuncs.com/higress/all-in-one:latest
# 访问控制台
# 浏览器打开 http://localhost:8001
6.2 配置多模型代理
通过控制台 UI 配置多模型路由:
步骤 1:添加服务来源
服务来源 → 创建 → DNS 类型
名称: openai-gpt
域名: api.openai.com
端口: 443
协议: HTTPS
名称: anthropic-claude
域名: api.anthropic.com
端口: 443
协议: HTTPS
名称: tongyi-qwen
域名: dashscope.aliyuncs.com
端口: 443
协议: HTTPS
步骤 2:配置路由规则
路由配置 → 创建
域名: ai-gateway.mycompany.com
路径: /v1/chat/completions
目标服务: openai-gpt
步骤 3:启用 AI 代理插件
# 全局 AI 代理插件配置
provider:
type: multi-model
models:
- id: gpt-5.4-turbo
provider: openai
api_key: sk-xxx
weight: 40
- id: claude-4-opus
provider: anthropic
api_key: sk-ant-xxx
weight: 30
- id: qwen-max
provider: qwen
api_key: sk-qwen-xxx
weight: 30
# 负载均衡策略
lb_policy: weighted-random
# 故障转移
fallback:
enabled: true
max_retries: 2
retry_on: [timeout, 5xx, rate_limit]
6.3 Kubernetes 生产部署
生产环境推荐 Helm 部署:
# 添加 Higress Helm 仓库
helm repo add higress.io https://higress.cn/helm-charts
helm repo update
# 创建命名空间
kubectl create namespace higress-system
# 安装 Higress
helm install higress higress.io/higress \
-n higress-system \
--set global.istioNamespace=higress-system \
--set higress-console.enabled=true \
--set controller.replicaCount=3 \
--set controller.resources.requests.cpu=500m \
--set controller.resources.requests.memory=512Mi \
--set controller.resources.limits.cpu=2000m \
--set controller.resources.limits.memory=2Gi
# 等待 Pod 就绪
kubectl wait --for=condition=ready pod -l app=higress-gateway \
-n higress-system --timeout=120s
6.4 配置 AI Token 限流
# ai-token-ratelimit.yaml
apiVersion: extensions.higress.io/v1alpha1
kind: WasmPlugin
metadata:
name: ai-token-ratelimit
namespace: higress-system
spec:
# 插件镜像
url: oci://higress-registry.cn-hangzhou.cr.aliyuncs.com/plugins/ai-token-ratelimit:1.0.0
# 插件配置
config:
# 按 API Key 限流
rule_name: ai_token_limit
rule_items:
# 免费用户:每分钟 1000 token
- key: api_key
limit_by_header: "X-API-Key"
limit_keys:
- key: "free-tier-*"
token_per_minute: 1000
request_per_minute: 10
# 付费用户:每分钟 100000 token
- key: "pro-tier-*"
token_per_minute: 100000
request_per_minute: 1000
# 企业用户:每分钟 1000000 token
- key: "enterprise-*"
token_per_minute: 1000000
request_per_minute: 10000
# 限流响应
rejected_code: 429
rejected_msg: "AI token rate limit exceeded. Please upgrade your plan."
6.5 配置 AI 语义缓存
# ai-semantic-cache.yaml
apiVersion: extensions.higress.io/v1alpha1
kind: WasmPlugin
metadata:
name: ai-semantic-cache
namespace: higress-system
spec:
url: oci://higress-registry.cn-hangzhou.cr.aliyuncs.com/plugins/ai-semantic-cache:1.0.0
config:
# 缓存后端
cache_backend: redis
redis:
service_name: redis-service.default.svc.cluster.local
service_port: 6379
password: "${REDIS_PASSWORD}"
# Embedding 模型
embedding:
provider: openai
model: text-embedding-3-small
api_key: "${OPENAI_API_KEY}"
# 缓存策略
similarity_threshold: 0.92 # 相似度阈值
cache_ttl: 3600 # 缓存有效期(秒)
max_cache_size: 10000 # 最大缓存条目数
# 缓存 key 策略
key_strategy:
# 按模型分组
group_by: [model, intent]
# 忽略消息中的时间戳等变量
ignore_patterns:
- "\\d{4}-\\d{2}-\\d{2}" # 忽略日期
- "\\d{1,2}:\\d{2}" # 忽略时间
七、性能优化:从千到百万 QPS
7.1 Envoy 连接池调优
AI 场景下,SSE 长连接对连接池配置提出了特殊要求:
# Envoy 连接池配置
apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
metadata:
name: ai-model-pool
spec:
host: ai-model-service
trafficPolicy:
connectionPool:
tcp:
maxConnections: 10000
connectTimeout: 5s
tcpKeepalive:
time: 60s
interval: 15s
probes: 3
http:
h2UpgradePolicy: UPGRADE # 升级到 HTTP/2
maxRequestsPerConnection: 0 # 不限制(长连接场景)
maxRetries: 3
idleTimeout: 300s # SSE 连接空闲超时 5 分钟
streamIdleTimeout: 600s # 流式响应超时 10 分钟
7.2 Wasm 插件性能优化
Wasm 插件的性能瓶颈通常在宿主与 Wasm VM 之间的数据交互。每次 proxywasm.GetHttpRequestHeader() 调用都是一次跨 VM 边界调用,开销不小。
优化策略:减少跨边界调用次数,批量获取数据。
// ❌ 低效写法:逐个获取 header
func onHttpRequestHeaders(ctx wrapper.HttpContext) types.Action {
method, _ := proxywasm.GetHttpRequestHeader(":method")
path, _ := proxywasm.GetHttpRequestHeader(":path")
host, _ := proxywasm.GetHttpRequestHeader(":authority")
auth, _ := proxywasm.GetHttpRequestHeader("authorization")
// 4 次跨 VM 调用
return types.ActionContinue
}
// ✅ 高效写法:一次性获取所有 header
func onHttpRequestHeaders(ctx wrapper.HttpContext) types.Action {
headers, _ := proxywasm.GetHttpRequestHeaders()
headerMap := make(map[string]string, len(headers))
for _, h := range headers {
headerMap[strings.ToLower(h[0])] = h[1]
}
// 只有 1 次跨 VM 调用
method := headerMap[":method"]
path := headerMap[":path"]
host := headerMap[":authority"]
auth := headerMap["authorization"]
return types.ActionContinue
}
另一个关键优化:避免在热路径上做 JSON 序列化。
// ❌ 低效:完整解析 JSON
var req map[string]interface{}
json.Unmarshal(body, &req)
model := req["model"].(string)
// ✅ 高效:正则或手写解析提取字段
func extractModel(body []byte) string {
// 用简单的字符串搜索代替完整 JSON 解析
idx := bytes.Index(body, []byte(`"model"`))
if idx == -1 {
return ""
}
// 找到值部分
start := bytes.Index(body[idx:], []byte(`"`))
// ... 手写提取逻辑
return model
}
7.3 语义缓存性能优化
语义缓存的性能瓶颈在 Embedding 计算。每个请求都需要计算一次向量,如果直接调用 OpenAI 的 Embedding API,延迟至少 50ms。
优化方案:本地 Embedding 模型。
// 使用 ONNX Runtime 运行本地 Embedding 模型
type LocalEmbedder struct {
session *onnxruntime.AdvancedSession
}
func NewLocalEmbedder(modelPath string) (*LocalEmbedder, error) {
// 加载 ONNX 格式的小型 Embedding 模型
// 推荐使用 all-MiniLM-L6-v2,模型仅 80MB,推理延迟 <5ms
session, err := onnxruntime.NewAdvancedSession(
modelPath,
[]string{"input_ids", "attention_mask"},
[]string{"last_hidden_state"},
nil, // 选项
)
return &LocalEmbedder{session: session}, err
}
func (e *LocalEmbedder) Embed(text string) ([]float32, error) {
// Tokenize(使用 Wasm 版本的 tokenizer)
tokens := tokenize(text)
// ONNX 推理
input := []onnxruntime.Value{
onnxruntime.NewTensor(tokens),
onnxruntime.NewTensor(attentionMask(tokens)),
}
output := []onnxruntime.Value{
onnxruntime.NewEmptyTensor([]int64{1, 384}), // MiniLM 输出维度 384
}
e.session.Run(input, output)
// 平均池化得到句子向量
vector := meanPool(output[0].Data().([]float32))
// 归一化
normalize(vector)
return vector, nil
}
本地 Embedding 将延迟从 50ms 降到 5ms,缓存查找整体延迟控制在 10ms 以内。
7.4 全链路压测数据
在阿里内部的压测中,Higress 作为 AI 网关的性能表现:
| 场景 | QPS | P50 延迟 | P99 延迟 | 网关额外延迟 |
|---|---|---|---|---|
| 普通转发(无插件) | 120,000 | 0.8ms | 2.1ms | 0.3ms |
| AI 代理 + 模型路由 | 85,000 | 1.2ms | 3.5ms | 0.7ms |
| + Token 限流 | 72,000 | 1.5ms | 4.2ms | 1.0ms |
| + 语义缓存(冷启动) | 65,000 | 1.8ms | 5.0ms | 1.3ms |
| + 语义缓存(热缓存) | 95,000 | 0.9ms | 2.8ms | 0.4ms |
| 全功能(5 插件叠加) | 55,000 | 2.5ms | 6.8ms | 2.0ms |
关键发现:语义缓存在热缓存场景下反而比无缓存更快——因为缓存命中后直接返回,不走后端大模型。网关的额外延迟(2ms)相对于大模型的响应时间(通常 500ms-5s)几乎可以忽略。
八、Higress vs Kong vs APISIX:AI 网关维度横评
| 维度 | Higress | Kong | APISIX |
|---|---|---|---|
| AI 多模型代理 | ✅ 原生支持,统一接口 | ❌ 需要自写插件 | ❌ 需要自写插件 |
| Token 级限流 | ✅ 内置 | ❌ 仅 QPS 限流 | ❌ 仅 QPS 限流 |
| 语义缓存 | ✅ 内置 | ❌ 无 | ❌ 无 |
| MCP 协议 | ✅ 原生支持 OpenAPI-to-MCP | ❌ 无 | ❌ 无 |
| SSE 流式转发 | ✅ 深度优化 | ⚠️ 基础支持 | ⚠️ 基础支持 |
| Wasm 插件 | ✅ Go/Rust/C++ | ⚠️ 实验性 | ❌ 仅 Lua |
| 插件热加载 | ✅ 动态加载 | ⚠️ 需重启 | ❌ 需重启 |
| K8s 原生 | ✅ Ingress + Gateway API | ⚠️ DB 模式需要数据库 | ⚠️ 依赖 etcd |
| 数据面性能 | Envoy(C++) | Nginx + Lua | Nginx + Lua |
| 社区活跃度 | 高(阿里主导) | 高 | 高 |
结论:如果你的主要场景是 AI 流量治理,Higress 目前是唯一的选择。Kong 和 APISIX 是优秀的通用 API 网关,但它们没有针对 AI 场景做深度优化。
九、安全防护:AI 网关的安全挑战
9.1 Prompt 注入防护
传统 WAF 的规则匹配对自然语言攻击无效。Higress 的 AI 安全插件采用了多层防护策略:
// Prompt 安全检测插件
type PromptGuard struct {
// 规则引擎:基于正则的快速检测
regexRules []*regexp.Regexp
// AI 检测:用小模型判断是否为注入
aiDetector *AIInjectorDetector
// 上下文检测:检测系统 prompt 泄露
leakDetector *SystemPromptLeakDetector
}
func (pg *PromptGuard) onHttpRequestBody(ctx wrapper.HttpContext, body []byte) types.Action {
prompt := extractUserPrompt(body)
// 第一层:快速正则过滤(<1ms)
for _, rule := range pg.regexRules {
if rule.MatchString(prompt) {
pg.blockRequest(ctx, "regex_rule_match", rule.String())
return types.ActionPause
}
}
// 第二层:AI 模型检测(~50ms)
if pg.aiDetector != nil {
isInjection, confidence := pg.aiDetector.Detect(prompt)
if isInjection && confidence > 0.85 {
pg.blockRequest(ctx, "ai_injection_detected", "")
return types.ActionPause
}
}
// 第三层:响应后检测系统 prompt 泄露
ctx.RegisterResponseEvent(func() {
response := extractAssistantResponse(ctx.GetResponseBody())
if pg.leakDetector.IsLeaking(response) {
// 标记但不阻断(避免误杀)
ctx.SetResponseHeader("X-AI-Security-Alert", "potential-leak")
}
})
return types.ActionContinue
}
9.2 API Key 安全
# API Key 管理配置
apiVersion: extensions.higress.io/v1alpha1
kind: WasmPlugin
metadata:
name: api-key-manager
spec:
config:
# Key 存储后端
storage:
type: redis
service: redis-service
# Key 加密存储
encryption:
algorithm: aes-256-gcm
key_vault: vault-service
# Key 轮换策略
rotation:
enabled: true
interval_days: 30
grace_period_hours: 48 # 新旧 Key 同时有效 48 小时
# 使用审计
audit:
enabled: true
log_key_usage: true
alert_on_anomaly: true
# 异常检测:单个 Key 请求量突增
anomaly_threshold: 10x # 10 倍于平均值
十、可观测性集成:Grafana 全景监控
Higress 原生输出 Prometheus 指标,配合 Grafana 可以构建完整的 AI 网关监控体系:
# Grafana Dashboard 配置(核心指标)
groups:
- name: ai_gateway_overview
rules:
# 请求维度
- record: ai_gateway:request_total
expr: sum(rate(higress_request_total[5m]))
- record: ai_gateway:request_latency_p99
expr: histogram_quantile(0.99, rate(higress_request_duration_bucket[5m]))
# Token 维度
- record: ai_gateway:prompt_tokens_total
expr: sum(rate(ai_proxy_prompt_tokens[5m]))
- record: ai_gateway:completion_tokens_total
expr: sum(rate(ai_proxy_completion_tokens[5m]))
- record: ai_gateway:token_cost_per_minute
expr: |
sum(rate(ai_proxy_prompt_tokens[1m])) * 0.01 # 假设 $0.01/1K tokens
+ sum(rate(ai_proxy_completion_tokens[1m])) * 0.03 # 假设 $0.03/1K tokens
# 模型维度
- record: ai_gateway:model_latency_p99
expr: |
histogram_quantile(0.99,
sum(rate(ai_proxy_request_duration_bucket[5m]))
by (le, model)
)
- record: ai_gateway:model_error_rate
expr: |
sum(rate(ai_proxy_request_error[5m])) by (model)
/ sum(rate(ai_proxy_request_total[5m])) by (model)
# 缓存维度
- record: ai_gateway:cache_hit_rate
expr: |
sum(rate(ai_cache_hit_total[5m]))
/ sum(rate(ai_cache_request_total[5m]))
# 告警规则
- alert: AIModelHighLatency
expr: ai_gateway:model_latency_p99 > 10000 # 10 秒
for: 5m
labels:
severity: warning
annotations:
summary: "AI 模型 {{ $labels.model }} P99 延迟超过 10 秒"
- alert: AIModelHighErrorRate
expr: ai_gateway:model_error_rate > 0.1 # 10% 错误率
for: 3m
labels:
severity: critical
annotations:
summary: "AI 模型 {{ $labels.model }} 错误率超过 10%"
- alert: TokenCostSpike
expr: |
ai_gateway:token_cost_per_minute
> 2 * avg_over_time(ai_gateway:token_cost_per_minute[1h])
for: 10m
labels:
severity: warning
annotations:
summary: "Token 消耗费用突增,当前是 1 小时平均值的 2 倍以上"
十一、真实案例:通义千问 APP 的网关架构
通义千问 APP 的日活超过千万,峰值 QPS 超过 50 万。它的流量架构是一个典型的 Higress 生产部署:
用户请求
│
├── 移动端 APP ──────┐
├── Web 端 ──────────┤
├── 小程序 ──────────┤
└── API 开发者 ──────┘
│
▼
┌───────────────┐
│ CDN / WAF │
└───────┬───────┘
│
▼
┌───────────────────────┐
│ Higress 网关集群 │
│ (3 地域 × 3 副本) │
│ │
│ ┌─ AI 代理 ────────┐ │
│ │ 模型路由 │ │
│ │ Token 限流 │ │
│ │ 语义缓存 │ │
│ │ 安全防护 │ │
│ │ 可观测性 │ │
│ └─────────────────┘ │
└───────────┬───────────┘
│
┌─────────────┼─────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 通义千问 │ │ Qwen-VL │ │ Qwen-Coder│
│ Max │ │ 视觉模型 │ │ 代码模型 │
└──────────┘ └──────────┘ └──────────┘
核心指标:
- 语义缓存命中率:35%(主要来自知识库问答场景)
- 多模型路由比例:通义千问 Max 55%、Qwen-Turbo 30%、Qwen-VL 15%
- Token 限流拦截率:0.8%(主要拦截滥用 API Key)
- 网关额外延迟:P99 < 3ms
十二、总结与展望
Higress 代表了 API 网关在 AI 时代的演进方向:
从协议代理到语义代理:传统网关只理解协议,AI 网关需要理解语义——语义缓存、Prompt 安全检测、OpenAPI-to-MCP,都在向这个方向演进。
从请求路由到智能路由:不再只是 URL 匹配,而是基于模型能力、实时延迟、成本、GPU 负载的综合路由决策。
从 QPS 限流到 Token 限流:AI 应用的计费粒度是 token,限流也必须是 token 级别。
从静态配置到动态插件:Wasm 插件热加载让网关能力可以像 App 一样安装和卸载。
从 API 网关到 AI Agent 基础设施:MCP 协议支持让网关不仅是流量的入口,更是 AI Agent 与外部世界交互的桥梁。
如果你正在构建 AI 应用,Higress 值得认真评估。它已经从「一个阿里内部工具」成长为「AI 网关领域的标准答案」。
相关资源:
- 官网:https://higress.cn
- GitHub:https://github.com/alibaba/higress
- 插件市场:https://higress.io/plugins
- 文档:https://higress.io/docs