编程 Agent Name Service(ANS)深度解析:Linux基金会如何用DNS为AI智能体建立信任基础设施——从身份认证到跨组织协作的完整技术指南

2026-07-05 18:43:36 +0800 CST views 12

Agent Name Service(ANS)深度解析:Linux基金会如何用DNS为AI智能体建立信任基础设施——从身份认证到跨组织协作的完整技术指南

引言:当AI智能体开始"社交",身份问题成了最大的盲区

2026年7月1日,Linux基金会宣布了一项可能改变AI智能体基础设施格局的计划——推出**Agent Name Service(ANS)**框架。这个消息在技术圈引发了不小的讨论,因为它触及了一个很多人已经开始感知到但尚未充分理解的问题:当AI智能体开始大规模跨组织协作时,我们如何知道"对面那个Agent是谁"?

想象一个场景:你的公司部署了十几个AI智能体处理不同的业务流程——客服、代码审查、数据分析、供应链管理。这些Agent需要和供应商的Agent、客户的Agent、甚至监管机构的Agent进行交互。在每一次交互中,你如何确认:

  • 对方Agent真的代表它声称的那个组织吗?
  • 它拥有执行当前操作的权限吗?
  • 它的代码和运行历史没有被篡改过吗?
  • 如果它做了什么违规的事情,你能追溯到具体的责任方吗?

这些问题在传统的软件系统中已经有成熟的解决方案——OAuth、JWT、mTLS、API Key等等。但AI智能体带来的挑战是全新的:它们不仅是"调用API的程序",更是"代表组织做决策的自主实体"。它们的行为边界是模糊的,它们的能力是动态变化的,它们的交互模式远比传统的请求-响应复杂得多。

ANS的出现,正是为了填补这个空白。

第一章:问题的本质——为什么现有方案不够用

1.1 从API到Agent:身份模型的根本转变

在传统API经济中,身份验证模型相对简单:

客户端(已知身份) → 请求 → 服务端(验证身份) → 响应

客户端通过API Key、OAuth Token或JWT来证明自己的身份,服务端验证后决定是否授权。这个模型有几个关键假设:

  1. 身份是静态的:一个API客户端的身份在注册时确定,很少变化
  2. 能力是预定义的:权限在注册时分配,运行时不会自发扩展
  3. 交互是单向的:通常是客户端请求服务端,而不是双向协作
  4. 责任链是清晰的:每个调用都可以追溯到一个确定的开发者或组织

但AI智能体打破了所有这些假设:

  1. 身份是动态的:一个Agent可能代表不同的用户或在不同的上下文中扮演不同的角色
  2. 能力是涌现的:Agent可以通过组合工具、调用其他Agent来获得新能力,这些能力在设计时可能并未预见
  3. 交互是多向的:Agent之间的通信是网状的,一个任务可能涉及多个组织的多个Agent协作
  4. 责任链是模糊的:当一个Agent做出错误决策时,是模型的问题、提示词的问题、工具的问题还是部署方的问题?

1.2 MCP和A2A解决了什么,没解决什么

2024-2025年,两个重要的协议在AI智能体生态中崛起:

MCP(Model Context Protocol)——由Anthropic主导,定义了AI模型如何与外部工具和数据源进行标准化通信。它解决了"Agent如何使用工具"的问题。

A2A(Agent-to-Agent Protocol)——由Google主导,定义了智能体之间如何发现彼此的能力并进行协作通信。它解决了"Agent之间如何对话"的问题。

但这两个协议都没有深入解决一个核心问题:信任

MCP假设你已经信任你要连接的工具服务器。A2A假设你已经信任你要协作的对方Agent。但在真实的跨组织场景中,"信任"不能是假设的——它必须是可以验证的。

这就像互联网早期的困境:TCP/IP解决了数据如何传输,HTTP解决了应用如何通信,但直到SSL/TLS和CA体系成熟,电子商务才真正成为可能。ANS要做的,就是为AI智能体经济建立"PKI(公钥基础设施)"级别的信任层。

1.3 企业面临的真实痛点

Forrester首席分析师Charlie Dai指出了问题的紧迫性:"智能体身份问题已经在早期生产部署中显现,尤其是在多个智能体跨工具、API和组织边界交互、却缺乏一致的身份验证和问责模型的场景中。"

Gartner总监分析师Jaishiv Prakash则从运营角度补充:"智能体身份已从架构层面的考量,演变为运营控制层面的空白。企业客户的反馈高度一致:他们需要知道是哪个智能体执行了操作、代表谁、拥有何种权限,以及其运行时行为是否与预期设计相符。"

这些不是理论问题。在金融、医疗、法律等受监管行业,缺乏可审计的智能体身份体系意味着:

  • 合规风险:监管机构要求所有自动化决策可追溯到具体的责任方
  • 安全隐患:恶意Agent可以冒充合法Agent进行社工攻击或数据窃取
  • 运营失控:无法有效管理和限制Agent的行为范围
  • 责任真空:当Agent造成损失时,无法确定责任归属

第二章:ANS架构深度解析——DNS的巧妙复用

2.1 为什么选择DNS?

ANS最引人注目的设计决策是基于现有DNS(域名系统)构建。这个选择乍看之下有些反直觉——DNS不是用来解析域名到IP地址的吗?跟AI智能体有什么关系?

但实际上,DNS的核心抽象与智能体身份的需求高度契合:

DNS的本质是一个分布式的、分层的、可验证的键值存储系统。

  • 分布式:没有单点故障,没有中心化控制
  • 分层:通过域名的层级结构实现组织边界的自然划分(.com → company.com → agent.company.com)
  • 可验证:通过DNSSEC可以验证记录的真实性和完整性
  • 全球可达:任何联网系统都可以查询DNS记录
  • 成熟可靠:运行了超过35年,是互联网最稳定的基础设施之一

Pareekh Consulting首席分析师Pareekh Jain指出:"对企业而言,ANS最大的优势之一或许在于其对DNS的依赖——企业已经使用DNS来管理域名和信任关系。这一方式避免了创建新的注册表,并允许企业利用现有互联网基础设施发布和验证智能体身份,从而降低采用难度和成本。"

2.2 ANS的核心记录类型

基于DNS构建意味着ANS可以利用DNS的TXT记录、SRV记录、SVCB记录等多种记录类型来存储智能体的元数据。以下是ANS可能使用的核心记录类型:

智能体身份记录(Agent Identity Record)

agent.customer-service.company.com. IN TXT (
  "ans-v=1"
  ";did=did:web:company.com:agents:customer-service"
  ";owner=company.com"
  ";Capabilities=customer-support,order-inquiry,refund-processing"
  ";model=gpt-4-turbo"
  ";version=2.3.1"
  ";created=2026-01-15T08:00:00Z"
  ";code-hash=sha256:a1b2c3d4..."
)

智能体服务记录(Agent Service Record)

_agent._tcp.company.com. IN SRV (
  10 60 443 api.company.com.
)

智能体能力发现记录

_capabilities.agent.company.com. IN TXT (
  "cap-v=1"
  ";tools=mcp:database-query,mcp:email-send,mcp:file-read"
  ";protocols=a2a/1.0,mcp/1.0"
  ";max-concurrent=50"
  ";rate-limit=1000/hour"
)

2.3 联邦式信任模型

ANS的一个关键设计原则是联邦式(Federated),而非中心化。这意味着:

  1. 没有单一的信任根:不像PKI依赖少数几个根CA,ANS的信任链沿着DNS的组织边界自然分布
  2. 组织自主管理:每个组织通过自己控制的域名来发布和管理自己的Agent身份
  3. 去中心化验证:任何系统都可以通过DNS查询来验证一个Agent的身份,无需依赖第三方注册中心
  4. 多信任锚点:除了DNS,ANS还支持去中心化标识符(DIDs)和法人实体标识符(LEIs),提供多层信任保障

这种设计特别适合跨组织的Agent协作场景。例如,当你的Agent需要与银行的Agent交互时:

你的Agent → 查询 bank.com 的DNS记录
         → 验证银行Agent的身份和能力
         → 确认其代码哈希与预期一致
         → 建立安全通信通道
         → 开始协作

整个过程不需要任何中心化的"Agent注册中心",利用的是已经存在的DNS基础设施。

2.4 与DID的融合

ANS并非仅依赖DNS。Linux基金会明确表示,框架同时支持:

  • DNS:提供全局可达的、基于域名的身份发布
  • DIDs(去中心化标识符):提供密码学级别的身份验证和可验证凭证
  • LEIs(法人实体标识符):将Agent与现实世界的法律实体关联

这三层身份的叠加,形成了一个从"技术身份"到"法律身份"的完整信任链:

DNS域名(技术身份)
  ↓ 关联
DID(密码学身份)
  ↓ 关联
LEI(法律实体身份)

第三章:相关生态——ANS并非孤军奋战

3.1 DNS-AID:IETF的标准化努力

与ANS密切相关的一个项目是DNS-AID(DNS-based Agent Identification and Discovery),这是一个由IETF(互联网工程任务组)推进的标准化提案,其参考实现已经托管在GitHub上(dns-aid/dns-aid-core)。

DNS-AID的核心目标是利用DNS记录帮助智能体发布能力并在网络中实现自我发现。它与ANS的关系可以理解为:

  • DNS-AID:侧重于技术规范层面,定义DNS记录格式和查询协议
  • ANS:侧重于框架层面,提供完整的身份管理和信任建立机制

DNS-AID的参考实现使用了SVCB(Service Binding)记录类型,这是一种相对较新的DNS记录类型(RFC 9460),专门用于服务发现和端点选择。以下是一个简化的DNS-AID记录示例:

# DNS-AID记录发布示例(概念代码)
import dns.update
import dns.rdatatype

def publish_agent_record(domain, agent_id, capabilities, endpoint):
    """发布一个Agent的DNS-AID记录"""
    update = dns.update.Update(domain)
    
    # 使用SVCB记录发布Agent服务信息
    svcb_data = f'10 {endpoint} port=443'
    update.add(f'_agent.{agent_id}', 300, dns.rdatatype.SVCB, svcb_data)
    
    # 使用TXT记录发布Agent能力信息
    txt_data = f'caps={",".join(capabilities)}'
    update.add(f'_agent.{agent_id}', 300, dns.rdatatype.TXT, txt_data)
    
    return update

# 发布一个客服Agent的记录
update = publish_agent_record(
    domain='company.com',
    agent_id='customer-service',
    capabilities=['order-inquiry', 'refund', 'faq'],
    endpoint='api.company.com'
)

3.2 AGNTCY:思科主导的完整基础设施栈

另一个值得关注的项目是AGNTCY(Agentic Network Technology Consortium),由思科主导并已加入Linux基金会托管。AGNTCY的定位是为多智能体系统提供完整的基础设施栈,涵盖:

  1. Agent Directory Service:联邦式的Agent注册表,支持发布、验证和发现
  2. SLIM(Secure Low-Latency Interactive Messaging):安全的网络级消息传递,支持发布/订阅、流式传输和MLS加密
  3. OASF(Open Agent Schema Framework):可扩展的数据模型,用于描述Agent的属性、技能和能力
  4. Identity:去中心化的Agent和工具身份管理,包括标识符、可验证凭证和基于策略的访问控制
  5. Observability and Evaluation:遥测收集器和工具,用于多智能体的可观测性和性能评估

AGNTCY与ANS的关系是互补而非竞争:

ANS:定义"Agent是谁"(身份和信任)
AGNTCY:定义"Agent能做什么、怎么做"(能力和通信)

3.3 生态全景图

将所有相关项目放在一起,2026年AI智能体基础设施的全景图逐渐清晰:

┌─────────────────────────────────────────────────────┐
│                 应用层 (Application Layer)             │
│   企业Agent  │  客服Agent  │  开发Agent  │  分析Agent   │
├─────────────────────────────────────────────────────┤
│                 协议层 (Protocol Layer)                │
│   MCP(工具调用) │ A2A(Agent间通信) │ 其他协议       │
├─────────────────────────────────────────────────────┤
│               基础设施层 (Infrastructure Layer)         │
│  ANS(身份/信任)│ AGNTCY(发现/消息/可观测)│ DNS-AID   │
├─────────────────────────────────────────────────────┤
│               底层基础设施 (Foundation)                 │
│   DNS/DNSSEC  │  DID  │  LEI  │  PKI  │  TLS         │
└─────────────────────────────────────────────────────┘

第四章:DNS作为信任基础的优势与风险

4.1 DNS的核心优势

全球覆盖:DNS是互联网上最广泛部署的基础设施,几乎每个联网设备都能查询DNS记录。

组织边界天然映射:域名的层级结构天然对应组织的层级关系。agent.microsoft.com显然是微软的Agent,agent.salesforce.com显然是Salesforce的Agent。

成熟的治理机制:域名系统有完善的注册、续费、转移和争议解决机制,这些都可以被ANS复用。

DNSSEC安全保障:DNS安全扩展(DNSSEC)提供了密码学级别的记录完整性和真实性验证,可以防止DNS记录被篡改。

零额外基础设施:企业不需要部署新的注册中心或验证服务器,利用现有的DNS基础设施即可。

4.2 DNS的安全局限性

然而,DNS最初并非为高保障身份认证而设计。Forrester分析师Charlie Dai警告:"DNS最初并非为高保障身份认证而设计,这使其容易受到欺骗、劫持以及延迟或传播不一致等问题的影响,进而削弱信任保障。"

主要的安全风险包括:

DNS缓存投毒:攻击者可能通过污染DNS缓存来伪造Agent的身份记录。

DNS劫持:如果攻击者控制了某个DNS服务器,可以返回虚假的Agent身份信息。

传播延迟:DNS记录的更新需要时间传播,在过渡期间可能存在不一致。

域名过期风险:如果一个组织忘记续费其域名,攻击者可能注册该域名并冒充其Agent。

4.3 缓解策略

针对上述风险,ANS框架提出了多层缓解策略:

1. DNSSEC强制使用

# 验证DNSSEC签名的Agent记录
import dns.resolver
import dns.dnssec

def verify_agent_record(agent_domain):
    """验证Agent的DNS记录是否通过DNSSEC签名"""
    resolver = dns.resolver.Resolver()
    resolver.use_edns(0, dns.flags.DO)  # 设置DNSSEC OK标志
    
    try:
        answer = resolver.resolve(agent_domain, 'TXT')
        # DNSSEC验证在解析器层面自动完成
        # 如果验证失败,会抛出异常
        for rdata in answer:
            if rdata.strings and b'ans-v=1' in rdata.strings[0]:
                return True, rdata.strings[0]
    except dns.dnssec.ValidationFailure:
        return False, "DNSSEC验证失败:记录可能被篡改"
    except Exception as e:
        return False, f"查询失败: {str(e)}"
    
    return False, "未找到ANS记录"

2. 多锚点交叉验证

不依赖单一的DNS记录,而是同时使用DNS、DID和LEI进行交叉验证:

def verify_agent_identity_multi_anchor(agent_id):
    """多锚点验证Agent身份"""
    results = {
        'dns': verify_dns_record(agent_id),
        'did': verify_did_document(agent_id),
        'lei': verify_lei_record(agent_id),
    }
    
    # 至少需要两个锚点确认
    confirmed = sum(1 for v in results.values() if v['verified'])
    if confirmed >= 2:
        return {'status': 'trusted', 'confidence': confirmed / 3}
    elif confirmed == 1:
        return {'status': 'partial', 'confidence': 1 / 3}
    else:
        return {'status': 'untrusted', 'confidence': 0}

3. 运行时行为验证

即使身份验证通过,也需要在运行时持续验证Agent的行为是否符合其声明的能力范围。

第五章:代码实战——构建一个ANS感知的Agent系统

5.1 系统架构

让我们设计一个完整的ANS感知Agent系统,包含以下组件:

  1. Agent注册服务:将Agent信息发布到DNS
  2. Agent发现服务:通过DNS查询发现和验证其他Agent
  3. Agent通信网关:在建立通信前自动执行身份验证
  4. 审计日志服务:记录所有跨Agent交互

5.2 Agent注册服务

import hashlib
import json
from datetime import datetime
from dataclasses import dataclass, field
from typing import List, Optional

@dataclass
class AgentDescriptor:
    """Agent描述符,定义Agent的核心属性"""
    agent_id: str
    owner_domain: str
    capabilities: List[str]
    model: str
    version: str
    code_hash: str
    endpoint: str
    protocols: List[str] = field(default_factory=lambda: ['a2a/1.0', 'mcp/1.0'])
    max_concurrent: int = 10
    rate_limit: int = 100
    created_at: str = field(default_factory=lambda: datetime.utcnow().isoformat() + 'Z')
    did: Optional[str] = None
    lei: Optional[str] = None

    def to_dns_txt(self) -> str:
        """将Agent描述符转换为DNS TXT记录格式"""
        parts = [
            f"ans-v=1",
            f"owner={self.owner_domain}",
            f"caps={','.join(self.capabilities)}",
            f"model={self.model}",
            f"ver={self.version}",
            f"hash=sha256:{self.code_hash}",
            f"proto={','.join(self.protocols)}",
            f"maxconn={self.max_concurrent}",
            f"ratelimit={self.rate_limit}",
            f"created={self.created_at}",
        ]
        if self.did:
            parts.append(f"did={self.did}")
        if self.lei:
            parts.append(f"lei={self.lei}")
        return " ".join(parts)

    def compute_code_hash(self, code_path: str) -> str:
        """计算Agent代码的SHA256哈希"""
        sha256 = hashlib.sha256()
        with open(code_path, 'rb') as f:
            for chunk in iter(lambda: f.read(8192), b''):
                sha256.update(chunk)
        self.code_hash = sha256.hexdigest()
        return self.code_hash


class AgentRegistrar:
    """Agent注册服务 - 将Agent信息发布到DNS"""
    
    def __init__(self, dns_server: str, tsig_key: Optional[str] = None):
        self.dns_server = dns_server
        self.tsig_key = tsig_key  # DNS更新的TSIG密钥
    
    def register_agent(self, descriptor: AgentDescriptor) -> dict:
        """注册一个Agent到DNS"""
        # 1. 验证描述符完整性
        self._validate_descriptor(descriptor)
        
        # 2. 生成DNS TXT记录
        txt_record = descriptor.to_dns_txt()
        
        # 3. 构建DNS更新消息
        record_name = f"_agent.{descriptor.agent_id}.{descriptor.owner_domain}"
        
        # 4. 发布到DNS(这里使用伪代码,实际需要dnspython库)
        result = self._publish_dns_record(
            zone=descriptor.owner_domain,
            name=record_name,
            rtype='TXT',
            ttl=300,
            data=txt_record
        )
        
        # 5. 如果配置了DID,同时发布DID文档
        if descriptor.did:
            self._publish_did_document(descriptor)
        
        return {
            'status': 'registered',
            'record_name': record_name,
            'txt_record': txt_record,
            'timestamp': datetime.utcnow().isoformat() + 'Z'
        }
    
    def _validate_descriptor(self, descriptor: AgentDescriptor):
        """验证Agent描述符的完整性"""
        if not descriptor.agent_id:
            raise ValueError("agent_id不能为空")
        if not descriptor.owner_domain:
            raise ValueError("owner_domain不能为空")
        if not descriptor.capabilities:
            raise ValueError("capabilities不能为空")
        if not descriptor.code_hash:
            raise ValueError("code_hash不能为空,Agent必须声明代码哈希")
    
    def _publish_dns_record(self, zone, name, rtype, ttl, data):
        """发布DNS记录(实际实现需要使用dnspython)"""
        # 实际实现:
        # import dns.update
        # import dns.tsigkeyring
        # update = dns.update.Update(zone, keyring=self.tsig_key)
        # update.add(name, ttl, rtype, data)
        # response = dns.query.tcp(update, self.dns_server)
        return {'status': 'success'}
    
    def _publish_did_document(self, descriptor: AgentDescriptor):
        """发布DID文档到可解析的位置"""
        did_doc = {
            "@context": ["https://www.w3.org/ns/did/v1"],
            "id": descriptor.did,
            "controller": f"did:web:{descriptor.owner_domain}",
            "verificationMethod": [{
                "id": f"{descriptor.did}#key-1",
                "type": "Ed25519VerificationKey2020",
                "controller": descriptor.did,
                # publicKeyMultibase 实际使用时需要真实的公钥
            }],
            "service": [{
                "id": f"{descriptor.did}#agent-service",
                "type": "AIAgentService",
                "serviceEndpoint": descriptor.endpoint
            }]
        }
        return did_doc

5.3 Agent发现与验证服务

import dns.resolver
import re
from dataclasses import dataclass
from typing import Optional, Tuple
from enum import Enum

class TrustLevel(Enum):
    FULL = "full"           # DNS + DID + LEI 全部验证通过
    HIGH = "high"           # DNS + DID 验证通过
    MEDIUM = "medium"       # 仅DNS验证通过(有DNSSEC)
    LOW = "low"             # DNS查询成功但无DNSSEC
    UNTRUSTED = "untrusted" # 验证失败

@dataclass
class AgentIdentity:
    """解析后的Agent身份"""
    agent_id: str
    owner_domain: str
    capabilities: list
    model: str
    version: str
    code_hash: str
    protocols: list
    trust_level: TrustLevel
    did: Optional[str] = None
    lei: Optional[str] = None
    raw_record: str = ""

class AgentDiscovery:
    """Agent发现与验证服务"""
    
    def __init__(self, use_dnssec: bool = True):
        self.resolver = dns.resolver.Resolver()
        self.use_dnssec = use_dnssec
        if use_dnssec:
            self.resolver.use_edns(0, dns.flags.DO)
    
    def discover_and_verify(self, agent_fqdn: str) -> Tuple[Optional[AgentIdentity], str]:
        """
        发现并验证一个Agent的身份
        agent_fqdn: 完整的Agent域名,如 "customer-service.company.com"
        """
        # Step 1: 查询DNS TXT记录
        record_name = f"_agent.{agent_fqdn}"
        txt_data, dnssec_valid = self._query_txt_record(record_name)
        
        if not txt_data:
            return None, "未找到Agent的ANS记录"
        
        # Step 2: 解析ANS记录
        identity = self._parse_ans_record(agent_fqdn, txt_data)
        
        # Step 3: 确定信任级别
        if identity.did:
            did_valid = self._verify_did(identity.did)
            if dnssec_valid and did_valid:
                identity.trust_level = TrustLevel.HIGH
            elif did_valid:
                identity.trust_level = TrustLevel.MEDIUM
        elif dnssec_valid:
            identity.trust_level = TrustLevel.MEDIUM
        else:
            identity.trust_level = TrustLevel.LOW
        
        # Step 4: 如果有LEI,验证法人实体
        if identity.lei:
            lei_valid = self._verify_lei(identity.lei)
            if lei_valid and identity.trust_level == TrustLevel.HIGH:
                identity.trust_level = TrustLevel.FULL
        
        return identity, "验证完成"
    
    def _query_txt_record(self, record_name: str) -> Tuple[Optional[str], bool]:
        """查询DNS TXT记录"""
        dnssec_valid = False
        try:
            answer = self.resolver.resolve(record_name, 'TXT')
            for rdata in answer:
                txt = b''.join(rdata.strings).decode('utf-8')
                if 'ans-v=1' in txt:
                    # 检查DNSSEC验证状态
                    if answer.response.flags & dns.flags.AD:
                        dnssec_valid = True
                    return txt, dnssec_valid
        except dns.dnssec.ValidationFailure:
            return None, False
        except dns.resolver.NXDOMAIN:
            return None, False
        except Exception as e:
            print(f"DNS查询异常: {e}")
            return None, False
        return None, False
    
    def _parse_ans_record(self, agent_fqdn: str, raw: str) -> AgentIdentity:
        """解析ANS TXT记录"""
        fields = {}
        for part in raw.split(';'):
            if '=' in part:
                key, value = part.split('=', 1)
                fields[key.strip()] = value.strip()
        
        owner = fields.get('owner', agent_fqdn.split('.', 1)[-1])
        caps = fields.get('caps', '').split(',') if fields.get('caps') else []
        proto = fields.get('proto', '').split(',') if fields.get('proto') else []
        
        return AgentIdentity(
            agent_id=agent_fqdn,
            owner_domain=owner,
            capabilities=caps,
            model=fields.get('model', 'unknown'),
            version=fields.get('ver', 'unknown'),
            code_hash=fields.get('hash', '').replace('sha256:', ''),
            protocols=proto,
            trust_level=TrustLevel.UNTRUSTED,  # 初始状态
            did=fields.get('did'),
            lei=fields.get('lei'),
            raw_record=raw
        )
    
    def _verify_did(self, did: str) -> bool:
        """验证DID文档"""
        # 实际实现需要:
        # 1. 解析DID获取DID文档
        # 2. 验证DID文档的签名
        # 3. 检查DID文档中的服务端点与DNS记录一致
        # 这里简化为示例
        try:
            if did.startswith('did:web:'):
                domain = did.replace('did:web:', '').replace(':', '/')
                # 查询DID文档
                # did_doc = requests.get(f"https://{domain}/.well-known/did.json")
                # 验证签名...
                return True
        except Exception:
            pass
        return False
    
    def _verify_lei(self, lei: str) -> bool:
        """验证法人实体标识符"""
        # 实际实现需要查询GLEIF(全球法人实体标识符基金会)的验证接口
        # https://api.gleif.org/api/v1/lei-records
        return bool(re.match(r'^[0-9A-Z]{18}[0-9]{2}$', lei))

5.4 ANS感知的Agent通信网关

import asyncio
import logging
from typing import Callable, Any
from dataclasses import dataclass

@dataclass
class CommunicationPolicy:
    """Agent通信策略"""
    min_trust_level: TrustLevel = TrustLevel.MEDIUM
    required_capabilities: list = None
    allowed_protocols: list = None
    max_message_size: int = 1024 * 1024  # 1MB
    rate_limit_per_minute: int = 60
    
    def __post_init__(self):
        if self.required_capabilities is None:
            self.required_capabilities = []
        if self.allowed_protocols is None:
            self.allowed_protocols = ['a2a/1.0']

class AgentCommunicationGateway:
    """ANS感知的Agent通信网关"""
    
    def __init__(self, discovery: AgentDiscovery):
        self.discovery = discovery
        self.logger = logging.getLogger('ans-gateway')
        self._identity_cache = {}
        self._interaction_log = []
    
    async def establish_connection(
        self, 
        target_agent: str, 
        policy: CommunicationPolicy
    ) -> dict:
        """
        与目标Agent建立安全连接
        在建立连接前自动执行ANS身份验证
        """
        self.logger.info(f"尝试连接Agent: {target_agent}")
        
        # Step 1: 发现并验证目标Agent
        identity, message = self.discovery.discover_and_verify(target_agent)
        
        if not identity:
            self.logger.error(f"Agent发现失败: {target_agent} - {message}")
            return {'status': 'failed', 'reason': f'Agent发现失败: {message}'}
        
        # Step 2: 检查信任级别
        if not self._check_trust_level(identity, policy):
            self.logger.warning(
                f"信任级别不足: {identity.trust_level.value} < {policy.min_trust_level.value}"
            )
            return {
                'status': 'denied',
                'reason': '信任级别不足',
                'actual': identity.trust_level.value,
                'required': policy.min_trust_level.value
            }
        
        # Step 3: 检查能力匹配
        if not self._check_capabilities(identity, policy):
            missing = set(policy.required_capabilities) - set(identity.capabilities)
            self.logger.warning(f"Agent缺少必要能力: {missing}")
            return {
                'status': 'denied',
                'reason': f'缺少必要能力: {missing}'
            }
        
        # Step 4: 检查协议兼容性
        if not self._check_protocol_compatibility(identity, policy):
            return {
                'status': 'denied',
                'reason': '协议不兼容'
            }
        
        # Step 5: 缓存身份信息
        self._identity_cache[target_agent] = {
            'identity': identity,
            'verified_at': asyncio.get_event_loop().time(),
            'ttl': 300  # 5分钟缓存
        }
        
        # Step 6: 记录交互日志
        self._log_interaction(target_agent, identity, 'connection_established')
        
        self.logger.info(f"成功连接Agent: {target_agent} (信任级别: {identity.trust_level.value})")
        
        return {
            'status': 'connected',
            'agent': target_agent,
            'trust_level': identity.trust_level.value,
            'capabilities': identity.capabilities,
            'protocols': identity.protocols,
            'owner': identity.owner_domain
        }
    
    def _check_trust_level(self, identity: AgentIdentity, policy: CommunicationPolicy) -> bool:
        """检查信任级别是否满足策略要求"""
        trust_order = [
            TrustLevel.UNTRUSTED,
            TrustLevel.LOW,
            TrustLevel.MEDIUM,
            TrustLevel.HIGH,
            TrustLevel.FULL
        ]
        actual_idx = trust_order.index(identity.trust_level)
        required_idx = trust_order.index(policy.min_trust_level)
        return actual_idx >= required_idx
    
    def _check_capabilities(self, identity: AgentIdentity, policy: CommunicationPolicy) -> bool:
        """检查Agent是否具备必要能力"""
        if not policy.required_capabilities:
            return True
        return set(policy.required_capabilities).issubset(set(identity.capabilities))
    
    def _check_protocol_compatibility(self, identity: AgentIdentity, policy: CommunicationPolicy) -> bool:
        """检查协议兼容性"""
        if not policy.allowed_protocols:
            return True
        return bool(set(policy.allowed_protocols) & set(identity.protocols))
    
    def _log_interaction(self, target: str, identity: AgentIdentity, event: str):
        """记录交互日志(用于审计)"""
        self._interaction_log.append({
            'timestamp': asyncio.get_event_loop().time(),
            'target': target,
            'owner': identity.owner_domain,
            'trust_level': identity.trust_level.value,
            'event': event,
            'code_hash': identity.code_hash
        })

5.5 完整使用示例

import asyncio

async def main():
    # 初始化组件
    discovery = AgentDiscovery(use_dnssec=True)
    gateway = AgentCommunicationGateway(discovery)
    
    # 定义通信策略:要求中等以上信任级别,需要订单查询能力
    policy = CommunicationPolicy(
        min_trust_level=TrustLevel.MEDIUM,
        required_capabilities=['order-inquiry'],
        allowed_protocols=['a2a/1.0'],
        rate_limit_per_minute=30
    )
    
    # 尝试连接供应商的Agent
    result = await gateway.establish_connection(
        target_agent='order-service.supplier.com',
        policy=policy
    )
    
    if result['status'] == 'connected':
        print(f"✅ 成功连接供应商Agent")
        print(f"   信任级别: {result['trust_level']}")
        print(f"   所有者: {result['owner']}")
        print(f"   能力: {', '.join(result['capabilities'])}")
    else:
        print(f"❌ 连接失败: {result['reason']}")

# 运行
asyncio.run(main())

第六章:标准碎片化的挑战与展望

6.1 当前标准生态的"战国时代"

ANS进入的是一个已经相当拥挤的标准生态系统:

框架/协议主导方核心关注点与ANS的关系
MCPAnthropicAgent与工具的通信互补:MCP管通信,ANS管身份
A2AGoogleAgent间通信协议互补:A2A管协议,ANS管信任
DNS-AIDIETFDNS-based Agent发现重叠:ANS可视为DNS-AID的超集
AGNTCYCisco/Linux基金会完整多Agent基础设施包含:ANS可能是AGNTCY身份层的实现

Gartner分析师Jaishiv Prakash对此的评价颇为精辟:多个触及智能体信任、身份和发现的框架同时出现,表明智能体基础设施市场正处于**"标准探索阶段",而非"标准整合阶段"**。

6.2 碎片化的风险

标准碎片化带来的实际风险包括:

互操作性问题:如果两个企业选择了不同的Agent身份框架,它们的Agent可能无法相互验证。

投资风险:企业担心选错了标准,导致未来需要大规模迁移。

安全漏洞:不同标准之间的"缝隙"可能被攻击者利用。

开发成本:开发者需要同时支持多个标准,增加了复杂性。

6.3 可能的演进方向

尽管存在碎片化风险,但ANS和相关标准的发展方向是明确的:

短期(2026-2027)

  • ANS框架的初始规范发布
  • DNS-AID在IETF的标准化推进
  • 早期采用者开始在受控环境中部署

中期(2027-2028)

  • 主要云厂商(AWS、Azure、GCP)开始集成Agent身份服务
  • 标准之间的互操作性协议逐步明确
  • 监管机构开始关注Agent身份的合规要求

长期(2028+)

  • 形成1-2个主导标准
  • Agent身份成为企业AI部署的标配
  • 跨组织Agent协作成为常态

6.4 给开发者的建议

面对当前的标准探索阶段,Prakash的建议值得重视:"企业在将任何单一方案视为战略性基础设施之前,应等待更清晰的方向和互操作性指南。"

但这并不意味着应该完全观望。务实的做法是:

  1. 从内部开始:先在组织内部建立Agent身份管理机制,不依赖外部标准
  2. 抽象接口层:设计可插拔的身份验证接口,便于未来切换标准
  3. 关注互操作:优先选择与其他标准有明确互操作路径的方案
  4. 参与社区:通过参与开源项目和标准组织,影响标准的演进方向

第七章:总结与展望

7.1 ANS的核心价值

ANS的核心价值可以用一句话概括:它将互联网最成熟的基础设施(DNS)应用于AI时代最紧迫的需求(Agent信任)。

这个设计的优雅之处在于:

  • 不重复造轮子:复用DNS的全球覆盖、层级结构和治理机制
  • 渐进式部署:企业可以利用已有的域名资产,无需从零开始
  • 联邦式架构:没有中心化的控制点,符合互联网的去中心化精神
  • 多层信任:通过DNS + DID + LEI的组合,提供从技术身份到法律身份的完整信任链

7.2 更大的图景

ANS的出现标志着AI智能体生态正在从"功能建设"阶段进入"信任建设"阶段。就像互联网的发展历程一样:

TCP/IP → HTTP → SSL/TLS → OAuth/OIDC → ...
(连通性)(应用)(安全)  (身份)

MCP → A2A → ANS → ...?
(工具)(通信)(身份)

当Agent的身份、信任和问责机制成熟之后,我们才能真正看到:

  • 跨组织Agent协作成为企业运营的常态
  • Agent市场经济的兴起——Agent可以像API一样被发现、订阅和计量
  • Agent合规框架的建立——监管机构可以对Agent的行为进行有效监管
  • Agent保险和责任分担机制的出现——为Agent的错误决策提供风险保障

7.3 最后的思考

ANS可能不是最终的解决方案,但它代表了正确的方向。在一个AI智能体即将无处不在的时代,"谁在做什么"这个看似简单的问题,需要一套全新的基础设施来回答。

对于开发者而言,现在是开始思考Agent身份和信任的最佳时机。不是等待标准成熟,而是从今天开始,在你的Agent系统中设计可审计、可追溯、可验证的身份机制。

因为当Agent经济真正爆发时,信任将是比能力更稀缺的资源。


参考资源

  • Linux Foundation ANS框架公告
  • DNS-AID参考实现:github.com/dns-aid/dns-aid-core
  • AGNTCY文档:docs.agntcy.org
  • IETF BANDAID草案
  • Forrester/Gartner分析师评论
  • MCP协议规范(Anthropic)
  • A2A协议规范(Google)

推荐文章

Go 如何做好缓存
2024-11-18 13:33:37 +0800 CST
js函数常见的写法以及调用方法
2024-11-19 08:55:17 +0800 CST
Nginx 实操指南:从入门到精通
2024-11-19 04:16:19 +0800 CST
在Rust项目中使用SQLite数据库
2024-11-19 08:48:00 +0800 CST
PHP解决XSS攻击
2024-11-19 02:17:37 +0800 CST
Graphene:一个无敌的 Python 库!
2024-11-19 04:32:49 +0800 CST
PHP如何进行MySQL数据备份?
2024-11-18 20:40:25 +0800 CST
程序员茄子在线接单