大规模评估多智能体系统(48分钟阅读)
摘要
OpenAI提供的一本操作手册,介绍了使用合成轨迹发现重复行为模式的大规模多智能体系统宏观评估工作流程。
OpenAI概述了一种用于智能体系统的宏观评估工作流程,该流程分析整个轨迹群体中的模式,而非个别失败。
查看缓存全文
缓存时间: 2026/05/25 18:23
# 智能体系统的宏观评估
来源:https://developers.openai.com/cookbook/examples/partners/macro_evals_for_agentic_systems/macro_evals_for_agentic_systems
当一个智能体系统出现故障时,问题往往不仅仅是一次糟糕的响应。交接可能发生得太晚,某个专业智能体可能在多次运行中遗漏同一个信号,或者审核流程可能对错误类型的案例触发。要改进系统,团队需要看到整个追踪群体中反复出现的行为。
本教程介绍了一个面向多智能体系统的宏观评估工作流程。我们使用一个合成的电动汽车订单工作流,其中专业智能体负责处理定价、合规、供应、工厂排程、调度和发布决策,同时市场和运营条件不断变化。
本笔记使用预先计算的合成追踪和保存的低层级评估标签,因此您无需 OpenAI API 密钥即可运行完整工作流程。
您将学习如何:
1. 生成或收集多次被追踪的智能体运行记录;
2. 对每次完成的运行执行低层级评估;
3. 将每条追踪记录转化为紧凑的文档;
4. 发现整个群体中的重复行为模式;以及
5. 深入分析一个高影响模式,找到人类接下来应检查系统的哪个环节。
目标不是构建一个完美的追踪分类体系,而是展示 AI 工程团队如何从数千个智能体事件中提炼出少量模式,这些模式能够被技术和业务相关方共同理解。
端到端宏观评估架构
核心思想是:本笔记评估的是一个已保存的智能体系统,而非普通的聊天记录。场景输入驱动一个协调好的专业智能体集群,运行时发出追踪束,保存的 Promptfoo 标签与规范化后的追踪数据连接,宏观评估层将这些证据转化为模式与诊断视图。
评估是 AI 团队衡量系统是否正常工作的方式。对于简单的模型调用,评估可能将单个输出与评估标准或参考答案进行比较。对于智能体系统,我们还需要评估系统是否正确使用了工具、是否委托给了正确的专业智能体、是否在风险较高时暂停以进行审核、以及是否始终基于业务上下文。
多智能体系统让这变得更困难,因为最终答案只是较长工作流中的最后一个事件。一个发布建议可能看起来合理,但追踪记录可能显示:定价智能体忽略了激励政策,供应智能体错过了缺货情况,或者编排器绕过了必要的审核步骤。
本笔记将问题分为两个层级:
- **低层级评估**:对单个智能体、交接、工具和完成的运行进行评分。在此示例中,Promptfoo 代表该智能体层级的评估层,它对运行是否处理了最终决策质量、策略正确性、专业智能体路由、市场漂移和审核适当性进行评分。
- **宏观评估**:审视多个低层级评估结果。它提出以下问题:哪些类型的问题重复出现?它们集中在何处?我们应该首先检查智能体工作流的哪个部分?
我们将在整个教程中使用四个面向读者的标签:
- `case_type`:生成的业务情况,例如清洁订单、验证阻塞、供应商替换或定价异常。
- `run_outcome`:运行的结束方式,例如已完成、等待审核、阻塞或失败。
- `eval_finding`:表明似乎有问题或存在风险的低层级信号。
- `behavior_pattern`:在多个追踪中发现的重现模式。
一个有用的思维模型是:`case_type` 是设置,`run_outcome` 是结局,`eval_finding` 是局部症状,而 `behavior_pattern` 是群体级别的模式。
``
import sys
from pathlib import Path
if sys.version_info < (3, 11):
raise RuntimeError("This notebook requires Python 3.11 or newer.")
if not Path("requirements.txt").is_file():
raise FileNotFoundError("requirements.txt must be in the same folder as this notebook.")
%pip install -q --upgrade pip setuptools wheel
%pip install -q --only-binary=:all: -r requirements.txt
``
安装依赖,然后加载与本示例捆绑的离线数据集。保存的 Promptfoo 标签位于本地数据文件夹中,因此本笔记不需要单独的 Promptfoo 配置文件、Promptfoo 运行产物或 OpenAI API 密钥。
预期文件:
``
data/trace_results.jsonl
data/run_summary.json
data/trace_bundles.zip
data/eval_labels.jsonl
``
`trace_bundles.zip` 会在笔记本首次运行时自动解压到本地缓存。可选的完整 SQLite 追踪快照可以放在 `data/trace_snapshot.sqlite` 中用于丰富数据,但并非端到端工作流所必需。
如果您的数据位于示例文件夹之外,请将 `MACRO_EVALS_DATA_ROOT` 设置为该目录。如果标签单独存放,请设置 `MACRO_EVALS_LABELS_PATH`。
``
from __future__ import annotations
import json
import os
import sqlite3
import sys
import warnings
import zipfile
from pathlib import Path
from time import perf_counter
from typing import Any
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from IPython.display import Markdown, display
pd.set_option("display.max_colwidth", 180)
pd.set_option("display.max_rows", 100)
warnings.filterwarnings("ignore", message="n_jobs value 1 overridden.*")
def find_example_root(start: Path | None = None) -> Path:
start = (start or Path.cwd()).resolve()
candidates = [start, *start.parents, start / "examples/partners/macro_evals_for_agentic_systems"]
for candidate in candidates:
if (candidate / "helpers/data_prep.py").is_file() and (candidate / "helpers/macro_eval_pipeline.py").is_file():
return candidate
raise FileNotFoundError("Could not locate the macro evals example root.")
EXAMPLE_ROOT = find_example_root()
HELPERS_ROOT = EXAMPLE_ROOT / "helpers"
if str(HELPERS_ROOT) not in sys.path:
sys.path.insert(0, str(HELPERS_ROOT))
from data_prep import add_public_label_columns, build_trace_documents, load_promptfoo_label_rows, normalize_bundle
from macro_eval_pipeline import (
drill_down_topic_root_causes,
pick_focus_topic,
plot_root_cause_story,
plot_suspect_leaderboard,
plot_topic_heatmap,
plot_topic_leaderboard,
plot_topic_scatter,
plot_trace_swimlane,
run_macro_discovery,
slice_topics_by_metadata,
)
def display_path(path: Path | None) -> str:
if path is None:
return "not found"
try:
return str(path.resolve().relative_to(EXAMPLE_ROOT))
except ValueError:
return str(path)
def as_path(value: str | Path) -> Path:
path = Path(value).expanduser()
return path if path.is_absolute() else EXAMPLE_ROOT / path
def unique_paths(paths: list[Path]) -> list[Path]:
seen: set[Path] = set()
unique: list[Path] = []
for path in paths:
resolved = path.resolve()
if resolved not in seen:
seen.add(resolved)
unique.append(resolved)
return unique
def find_material(label: str, names: list[str], *, kind: str = "file", required: bool = True) -> Path | None:
checked: list[Path] = []
for root in DATA_ROOTS:
for name in names:
if not name:
continue
candidate = as_path(name) if Path(name).expanduser().is_absolute() else root / name
checked.append(candidate)
if kind == "dir":
exists = candidate.is_dir() and any(candidate.glob("*.json"))
else:
exists = candidate.is_file()
if exists:
return candidate.resolve()
if required:
checked_text = "\n".join(f"- {display_path(path)}" for path in checked)
raise FileNotFoundError(f"Missing {label}. Checked:\n{checked_text}")
return None
def ensure_trace_bundle_dir(bundle_dir: Path | None, bundle_zip: Path | None) -> Path:
if bundle_dir is not None:
return bundle_dir
if bundle_zip is None:
raise FileNotFoundError("Missing trace bundles. Expected data/trace_bundles/ or data/trace_bundles.zip.")
cache_dir = bundle_zip.parent / ".macro_eval_cache" / "trace_bundles"
marker = cache_dir / ".extracted_from_trace_bundles_zip"
if not marker.is_file() or not any(cache_dir.glob("*.json")):
cache_dir.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(bundle_zip) as archive:
for member in archive.infolist():
if member.is_dir() or not member.filename.endswith(".json"):
continue
(cache_dir / Path(member.filename).name).write_bytes(archive.read(member))
marker.write_text(str(bundle_zip.stat().st_mtime_ns), encoding="utf-8")
return cache_dir.resolve()
env_data_root = os.environ.get("MACRO_EVALS_DATA_ROOT")
DATA_ROOTS = unique_paths(
([as_path(env_data_root)] if env_data_root else [])
+ [
EXAMPLE_ROOT / "data",
]
)
RESULTS_PATH = find_material("trace results", ["trace_results.jsonl", "metadata/results.jsonl", "results.jsonl"])
SUMMARY_PATH = find_material("run summary", ["run_summary.json", "metadata/summary.json", "summary.json"])
SQLITE_PATH = find_material("optional trace snapshot", ["trace_snapshot.sqlite"], required=False)
BUNDLE_ZIP_PATH = find_material("trace bundle archive", ["trace_bundles.zip", "bundles.zip"], required=False)
BUNDLE_DIR = ensure_trace_bundle_dir(find_material("trace bundles", ["trace_bundles", "bundles"], kind="dir", required=False), BUNDLE_ZIP_PATH)
PROGRESS_PATH = find_material("run progress", ["run_progress.json", "metadata/progress.json", "progress.json"], required=False)
PROMPTFOO_LABELS_PATH = find_material(
"lower-level eval labels",
[
os.environ.get("MACRO_EVALS_LABELS_PATH", ""),
"eval_labels.jsonl",
"metadata/eval_labels.jsonl",
],
required=False,
)
DATA_ROOT = next((root for root in DATA_ROOTS if RESULTS_PATH.is_relative_to(root)), DATA_ROOTS[0])
TRACE_LIMIT = int(os.environ.get("MACRO_EVALS_TRACE_LIMIT", "0")) or None
DISCOVERY_DOC_COLUMN = "doc_structured_summary"
DISCOVERY_MIN_CLUSTER_SIZE = int(os.environ.get("MACRO_EVALS_DISCOVERY_MIN_CLUSTER_SIZE", "24"))
RANDOM_STATE = 42
resolved_paths_df = pd.DataFrame(
[
("Trace results", RESULTS_PATH),
("Run summary", SUMMARY_PATH),
("Trace bundle archive", BUNDLE_ZIP_PATH),
("Expanded trace bundles", BUNDLE_DIR),
("Optional trace snapshot", SQLITE_PATH),
("Run progress", PROGRESS_PATH),
("Lower-level eval labels", PROMPTFOO_LABELS_PATH),
],
columns=["material", "path"],
)
resolved_paths_df["path"] = resolved_paths_df["path"].map(display_path)
display(Markdown("### Data materials"))
display(resolved_paths_df)
display(Markdown(f"Example root: `{display_path(EXAMPLE_ROOT)}` \nData root: `{display_path(DATA_ROOT)}`"))
``
模拟的业务是一个电动汽车订单及后期配置工作流。客户选择了一种车辆配置,公司需要决定订单是否可以按原样处理、是否需要调整、是否应重新路由、是否需要替换,或者是否应暂停以进行审核。
模拟包含了使实际汽车履约变得困难的各种约束条件:
- 组件可用性与供应商替换;
- 工厂产能与生产调度;
- 定价异常、促销与激励;
- 关税与过时的市场信号;
- 区域合规约束;
- 客户澄清与升级路径;
- 针对高风险或模糊案件的发布审核阈值。
智能体集群围绕这些业务职责进行组织。一个编排器接收订单和当前环境,然后委托给专业智能体,例如验证、供应风险、采购规划、产能平衡、工厂排程、市场情报、定价、合规、客户沟通以及发布审核。
这自然对应于 OpenAI Agents SDK。在该 SDK 中,智能体是工作流的核心单元:它将模型、指令和可选的运行时行为(如工具、交接、护栏和结构化输出)打包在一起。模拟遵循这种模式:
- **专业智能体**为决策的一个部分包装了指令和工具;
- **交接**允许编排器将任务委托给其他专业智能体,而不是将所有责任塞进一个提示词中;
- **函数工具**通过结构化输入和输出公开订单数据、环境信号和审批标记;
- **护栏和审核阈值**代表针对高风险或模糊情况的验证、阻止和人工审核流程;
- **结构化输出**使下游评分和聚合成为可能;
- **追踪记录**保留了模型调用、工具调用、交接、护栏和自定义跨度的结构化记录,用于调试和宏观分析。
本笔记后面的低层级评估基于这个模拟故事。如果案例类型表明在关税压力下存在供应商替换,那么追踪记录应显示出对供应、政策、市场和审核风险的认知。如果案例类型是清洁的,那么不必要的升级本身就是一条发现。
``
def read_json(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def read_jsonl(path: Path) -> list[dict[str, Any]]:
return [json.loads(line) for line in path.read_text(encoding="utf-8").splitlines() if line.strip()]
def result_run_id(row: dict[str, Any]) -> str | None:
if row.get("run_id"):
return str(row["run_id"])
if row.get("bundle_path"):
return Path(str(row["bundle_path"])).stem
return None
def sqlite_table_counts(db_path: Path | None) -> pd.DataFrame:
tables = [
"runs",
"configs",
"traces",
"trace_events",
"spans",
"review_packets",
"environment_events",
"environment_decisions",
]
if db_path is None:
return pd.DataFrame([{"table": table, "row_count": 0} for table in tables])
with sqlite3.connect(db_path) as conn:
existing_tables = {
row[0]
for row in conn.execute("select name from sqlite_master where type = 'table'")
}
rows = [
{
"table": table,
"row_count": conn.execute(f"select count(*) from {table}").fetchone()[0] if table in existing_tables else 0,
}
for table in tables
]
return pd.DataFrame(rows)
def load_sqlite_runs(db_path: Path | None) -> pd.DataFrame:
if db_path is None:
return pd.DataFrame()
summary_fields = [
"scenario_family",
"validation_outcome",
"review_status",
"review_decision",
"triage_outcome",
"market_regime",
"price_regime",
"schedule_regime",
"agent_version_set",
"orchestrator_mode",
"rogue_window_id",
"factory_release_state",
"trace_family",
"loop_count",
"retry_count",
"arbitration_count",
"compound_issue_count",
"specialist_activations",
"environment_event_ids",
"findings",
"failure_agent",
"error_code",
"error_message",
]
rows = []
with sqlite3.connect(db_path) as conn:
for row in conn.execute("select run_id, config_id, trace_id, status, terminal_state, started_at, ended_at, summary_json from runs"):
run_id, config_id, trace_id, status, terminal_state, started_at, ended_at, summary_json = row
summary = json.loads(summary_json or "{}")
item = {field: summary.get(field) for field in summary_fields}
item.update(
{
"run_id": run_id,
"config_id": config_id,
"trace_id": trace_id,
"sqlite_status": status,
"s
相似文章
自动化智能体评估的实证研究
本文介绍了 EvalAgent,这是一个通过编码领域专业知识来自动化 AI 智能体评估的系统,旨在解决标准编程助手在此任务中的局限性。此外,本文还提出了用于测试评估流程的基准 AgentEvalBench,并展示了在评估可靠性方面的显著提升。
解密 AI Agent 的评测方法
Anthropic 发布了一份指南,介绍如何为 AI Agent 设计严谨的自动化评测方案,重点解决了多轮交互和状态修改带来的复杂性挑战。
构建AI代理时如何进行评估与可观测性?
作者探讨了在生产环境中评估和监控AI代理所面临的挑战,包括离线评估与在线评估、LLM作为评判、链路追踪和成本追踪,并提到Langfuse、LangSmith等工具,但更关注底层流程。
@0xCodez: https://x.com/0xCodez/status/2058513716509913581
关于使用 Claude Managed Agents 构建多智能体团队的全面指南,涵盖角色设计、模型混合和并行执行,以将团队从1个扩展到20个智能体。
@neil_xbt: 一个单一的AI智能体就像一位厨师试图独自经营一家餐厅!多智能体系统则是完整的厨房。备菜厨…
这条推文用餐厅类比来解释多智能体AI系统,并推广了IBM Technology提供的一本免费指南,内容涵盖领域专业化、集体学习和可扩展性。