使用Temporal构建可扩展的数据摄取管道(第一部分)

Lobsters Hottest 工具

摘要

这篇博客文章描述了使用Temporal构建可扩展的数据摄取管道的架构,用于处理来自不同来源的客户文档的抓取、提取、分块和嵌入,强调了持久性、状态性和并发控制。

<p><a href="https://lobste.rs/s/gbxsuf/building_scalable_ingestion_pipeline">评论</a></p>
查看原文
查看缓存全文

缓存时间: 2026/05/26 21:24

# 使用 Temporal 构建可扩展的数据摄取管道(第一部分) 来源:https://blog.rapidflare.ai/blog/temporal-ingestion-pipeline-part1/ ## 第一部分:架构设计 *两篇系列的第一篇。第二篇(https://blog.rapidflare.ai/blog/temporal-ingestion-pipeline-part2/)涵盖了心跳、取消、审批策略、可观测性、错误处理和开发人员隔离。* --- ## 我们在构建什么 我们的 AI 代理需要访问客户文档,这些文档可能位于 Confluence、SharePoint、Google Drive、Salesforce Knowledge 或其他 20 多个平台中。将这些文档处理成可搜索的状态意味着要进行爬取、提取、分块、嵌入,并存储在 Supabase、TurboPuffer 和 Elasticsearch 中。对于小型源,简单的批处理作业就足够了。对于大型源,包含数十万份文档和数小时的处理时间,我们需要更具弹性的方案。我们基于 Temporal 构建了数据摄取管道,本文将介绍其架构。 ## 问题 源规模的差异非常大。一个小的站点地图只有几十页。一个大客户的知识库可能有数十万份文档。一份电子产品的数据手册可能长达数千页,包含规格、合规数据和电路图。 管道在纸面上看起来很简洁: `` 爬取源 → 下载 → 提取文本 → 分块 → 嵌入 → 存储到数据库 `` 但在实践中,它需要具备以下特性: - **持久性**:运行可能需要数小时。如果在第四小时崩溃,不应该从头开始重启。 - **有状态**:需要跟踪哪些文档成功、失败或跳过,以及从哪里继续。 - **并发可控**:下游 API 有速率限制。无限制的扇出会拖慢速度,而不是加快。 - **可观测**:当处理跨分布式工作节点的数千份文档时,需要能够将失败追溯到单个文档。 - **审批把关**:在新摄取的数据上线之前,应该有人审查已索引的内容。 我们评估了几种编排方案,最终选择了 Temporal。方案对比是另一篇文章的内容。本文重点介绍使 Temporal 能够大规模工作的架构模式,以及其背后的设计目标:一个包含 20 万份文档的运行应该使用与 2 千份文档相同的编排模型,并且能够在容量允许的情况下扩展到数百万份。 规模扩大时,最先出问题的三件事是: **速率限制。** 每份文档都会触发 LLM 调用(图像描述、摘要)、嵌入 API 调用和数据库写入。无限制的并发工作流意味着无限制的并发 API 调用。LLM 提供商开始返回 429 错误。每个子任务都用指数退避重试。结果不是更快完成,而是所有事情都陷入停滞。 **资源耗尽。** 工作节点池的容量有限。过于激进的扇出会导致 Temporal 中的队列堆积、工作节点的内存压力以及级联超时。 **长尾问题。** 即使采用固定的批处理,一份巨大的电子产品数据手册也可能阻塞整个批次的槽位,而其他任务则空闲等待。 每个阶段都可能独立失败。PDF 提取器会在格式错误的文件上崩溃,LLM 调用遇到 503 错误,爬虫可能运行数小时。 --- ## 管道概览 管理员触发数据摄取,运行过程分为三个阶段: 1. **暂存阶段** - 爬取 + 下载 - 检测大型 PDF - 卸载到存储 2. **处理阶段(滑动窗口)** - 加载页面,启动子任务(N 个并发) - 大型 PDF:拆分成块,处理块(并行),索引父文档,发送完成信号 管道与源无关。我们支持超过 20 种源类型(Confluence、SharePoint、Google Drive、站点地图、Salesforce Knowledge、FluidTopics、视频平台等),每种类型都有自己的爬虫,但每个爬虫都产生相同的输出形状。从滑动窗口到提取、索引、审批和清理的整个下游管道,无论源是什么,都以相同的方式工作。 添加新的源类型只需实现一个新的爬虫。管道的其余部分不变。暂存活动根据配置分派到正确的爬虫,从那时起,SharePoint 文档和 Confluence 页面在系统中的表现没有区别。 对于大型源,暂存活动可能运行数小时。大型 PDF 被路由到专门的流程,首先将其拆分成块。在整个过程中,一个专用的状态同步工作节点将进度同步到数据库,以便管理员 UI 显示实时计数。 ### 审批与提升 审批关卡的存在是因为“成功处理”不等于“可以安全提供”。新索引的数据首先进入源的隔离暂存副本。根据源的审批策略,工作流要么自动批准满足配置质量标准的可信运行,要么等待人工审查员检查计数、样本和明显的提取问题,然后才能查询数据。 批准后,我们通过将实时引用切换到新数据集并淘汰旧数据集来提升暂存副本。如果拒绝或取消,则丢弃暂存副本,保持当前实时数据不变。这使得数据摄取具有持久性,而不会让不良爬取结果立即暴露给最终用户。 ### Temporal 快速入门 如果你不每天都在使用 Temporal,那么阅读本文其余部分时需要了解四个术语: - **工作流(Workflow)**:持久化的编排逻辑。它决定下一步做什么。 - **活动(Activity)**:执行外部 I/O 的代码,例如爬取、提取、嵌入或写入。 - **信号(Signal)**:发送到运行中的工作流的异步消息。 - **继续即新(Continue-as-new)**:启动一个携带先前状态的新工作流运行,以保持历史记录较小。 在工作流页面边界处,父工作流会检查历史记录是否过大,如果 Temporal 建议重启,它会执行。当重启时,父工作流会排空所有挂起的信号,保存其游标位置,并继续即新。正在进行的子工作流继续运行,并向新实例发送信号。 继续即新保持相同的工作流 ID,但启动一个具有全新事件历史的新运行。这对我们的信号模式很重要:子工作流可以通过工作流 ID 持续寻址父工作流,而父工作流则保持其历史记录在可控范围内。 第二部分涵盖操作方面:心跳、错误处理、取消、审批策略、可观测性和开发人员隔离。 --- ## 架构概览 ### 三个工作节点,一个进程 我们在同一个进程中运行三个 Temporal 工作节点,每个节点都有自己的任务队列: | 工作节点 | 角色 | 并发性 | |------------|------------------------------------------------|------------| | **Ingestion**(摄录) | 爬取、提取、嵌入、索引 | 较高并发性 | | **Enrichment**(丰富) | 摄取后的摘要、打标签 | 较低并发性 | | **Status Sync**(状态同步) | 进度持久化到数据库 | 较低并发性 | 为什么分离工作节点?隔离性。我们不希望大量的并发提取活动饿死更新管理员 UI 的状态同步任务。状态同步工作节点有自己的并发预算,即使摄录通道饱和,它也能始终写入进度。 ### 在 Cloud Run 上部署 我们将 Temporal 工作节点作为容器化服务部署在 Google Cloud Run 上。 实践中,一个 Cloud Run 实例运行一个进程,该进程托管所有三个工作节点。当 Cloud Run 扩展时,它会将相同的多工作节点进程复制到更多实例上。因此,隔离边界是任务队列及其并发预算,而扩展单元是整个工作节点进程。 - **实例身份**:每个 Cloud Run 实例都有一个唯一 ID,我们将其嵌入到 Temporal 工作节点身份字符串中,用于分布式追踪。 - **健康检查**:Cloud Run 监控工作节点健康状态,并自动替换不健康的实例。 - **版本管理**:我们将新的工作节点代码部署为修订版本,并逐步转移流量以实现零停机更新。 工作节点身份格式为 `ingestion-worker-{service}-{revision}-{instance}`。这会在 Temporal UI 中显示在每个活动执行旁边,从而可以轻松追踪哪个 Cloud Run 实例处理了哪个文档。 ### 活动与工作流的对比 活动执行 I/O:爬取、提取、嵌入、存储。工作流做出决策:下一步做什么、如何处理失败、何时重启。暂存活动根据源类型分派到适当的爬虫,然后向工作流提供一个规范化的文档形状,使得下游处理路径保持一致。 ### 传递大数据:云存储作为总线 Temporal 有负载大小限制。我们的暂存活动可能产生数千个文档的元数据,这对于在 Temporal 的事件历史中传递来说太大了。 公开的 Temporal Cloud 限制(https://docs.temporal.io/cloud/limits)是一个有用的设计约束:单个负载限制为 2 MB,单次事件历史事务限制为 4 MB,单个工作流执行历史限制为 51,200 个事件或 50 MB。单个工作流执行最多可以接收 10,000 个信号,Temporal 还对未完成的活动、信号和子工作流施加每次执行的并发限制。即使在达到这些硬限制之前,大量历史记录也会减慢重放速度并使调试变得困难。 因此,我们将数据卸载到云存储桶。暂存活动将结果写入桶,并仅返回一个轻量级引用(路径 + 页面计数)。下游活动一次加载一个页面: `` flowchart TB A["Staging activity"] -->|"write pages"| B["Cloud Storage bucket"] A -->|"return storage ref"| C["Ingestion workflow"] C -->|"request page N"| D["Load page activity"] D -->|"read page"| B D -->|"bounded doc batch"| E["Child workflows"] classDef activity fill:#f5f3ff,stroke:#7c3aed,color:#111827,stroke-width:1.4px classDef storage fill:#ecfeff,stroke:#22d3ee,color:#111827,stroke-width:1.4px classDef workflow fill:#eef2ff,stroke:#2563eb,color:#111827,stroke-width:1.4px class A,D activity class B storage class C,E workflow `` 这也解决了分布式执行的问题。活动在生产中运行在不同的 Cloud Run 实例上,因此由实例 A 上的暂存活动下载的文件需要能被实例 B 上的提取活动访问。云存储是共享总线。 #### 如何在 Python 中抽象这一点 我们构建了一个小的抽象层,调用者无需考虑存储细节。它由三部分组成。 **活动结果包装器。** 每个活动返回一个通用的结果类型,该类型知道如何卸载自身。你调用 `.offload(paginated=True)`,该结果就会序列化到云存储,拆分成页面,从内存中清除自身,并仅存储存储路径和页面计数。现在通过 Temporal 传递的是轻量级引用,而不是实际数据。 **可分页的文档类型。** 文档类型实现了一个带有 `.get_pages()` 方法的基类。每种类型都知道如何将文档列表拆分成配置大小的页面。暂存活动在爬取后调用 `.offload()`,下游工作流每次只加载一个页面。 **页面加载活动。** 在加载端,一个专用活动从云存储读取页面,并向工作流返回一个边界批次文档。外部 I/O 保持在活动内部;工作流代码仅接收确定性输入,并决定接下来要启动哪些子工作流。 在代码中,使用模式如下: `` # Staging activity: crawl, pre-analyze, then offload to cloud storage result = await crawl_source(params) analyze_documents_for_splitting(result) result.offload(paginated=True) # Serializes pages to storage, frees memory return result # Only a lightweight ref passes through Temporal # Parent workflow: load one page at a time for page_num in range(staging.total_pages): page = await workflow.execute_activity(load_page, staging.ref, page_num) for doc in page.docs: # Already materialized by the load_page activity start_child_workflow(doc) `` 底层存储层是一个抽象基类,有两个实现:一个用于本地开发(写入文件系统),一个用于生产(写入 Google Cloud Storage)。工厂根据环境配置选择正确的实现。整个卸载/加载模式在开发和环境中完全一致,无需任何代码更改。 我们还视暂存对象为临时的摄录工单。路径按源和运行进行作用域划分,并在审批、拒绝或取消后进行清理,这样暂存数据就不会成为客户文档的第二个长期副本。 --- ## 滑动窗口:受控的扇出 ### 为什么不直接批处理? 你可能会想:“好吧,不要一次性扇出所有东西。只需分成固定的组,等待一批完成,再开始下一批。” 这更好,但仍然会遇到**长尾问题**。如果大多数文档快速完成,但一份巨大的电子产品数据手册需要明显更长的时间,那么其他槽位就会空闲。 ### 滑动窗口 **滑动窗口**始终保持恰好 N 个并发的子工作流运行。任何一个完成时,下一个文档立即开始。没有空闲槽位。API 调用随时间均匀分布,而不是突发。 当 doc3 完成时,3 个槽位中的 4 个空闲。批次 2 无法开始,直到最慢的文档完成。 当 doc1 完成时,doc5 立即开始。Doc3 不会阻塞任何人。 在实践中: - 朴素扇出:最慢(API 节流占主导) - 固定批次:更好(但有空闲时间浪费) - 滑动窗口:最快(最大利用率,自然背压) ### 估计吞吐量 滑动窗口为你提供了一个简单的模型来估计总处理时间: `` 总文档数: D 每份文档平均处理时间: W 窗口大小(并发数): N 估计处理时间 ≈ (D × W) / N `` 这是一个规划估计,不是保证。重试、排队延迟、速率限制和非常大的异常文档都会增加实际总时间。但它为你提供了一个可旋转的旋钮:如果速率限制允许则增加 N,如果遇到 429 错误则减少 N。 用你自己的数字试试: #### 滑动窗口计算器 拖动滑块估算你的工作负载的处理时间。 文档数 (D):10,000 每份文档平均处理时间 (W):30 秒 窗口大小 (N):20 估计时间:4.2 小时 (10,000 × 30) / 20 = 15,000 秒 ### 工作原理 父工作流维护一组活动文档 ID(上限为 N)和一个内存中的信号队列。子工作流作为**即发即忘**启动。当每个子工作流完成时,它会向父工作流发送一个 Temporal 信号,并带上结果。父工作流处理信号以释放槽位,然后用下一个文档填充它们。 `` flowchart LR A["Window full\n(N active)"] --> B["wait_condition()"] B --> C["Child finishes"] C --> D["Signal to parent"] D --> E["Drain queue"] E --> F["Free slot"] F --> G["Start next child"] G --> A classDef active fill:#f5f3ff,stroke:#7c3aed,color:#111827,stroke-width:1.4px classDef signal fill:#ecfeff,stroke:#22d3ee,color:#111827,stroke-width:1.4px classDef action fill:#eef2ff,stroke:#2563eb,color:#111827,stroke-width:1.4px class A,B,F,G active class C,D signal class E action `` 关键的 Temporal 原语: - `workflow.wait_condition(predicate)` 阻塞直到谓词为真,在每次信号后评估。没有轮询循环。 - `@workflow.signal` 是子到父的通信。子工作流发送完成信号,附带文档 ID 和成功/失败状态。 - `ParentClosePolicy.ABANDON` 意味着子工作流在父工作流通过继续即新重启后仍然存活。这不是默认行为,所以我们显式设置它。信号仍然到达新的父实例,因为它们是按工作流 ID 寻址的,而不是内存中的引用。 在所有文档提交后,父工作流进入**排空阶段**,等待剩余在途的子工作流,并设置一个安全超时,用于处理那些崩溃而没有发送信号的子工作流。

相似文章

@tricalt: https://x.com/tricalt/status/2057173322924806651

X AI KOLs Timeline

一位创始人讨论了在生产环境中使用Markdown文件作为AI代理记忆的扩展挑战,突出了关于权限、多代理交互和时间查询的常见陷阱,并指出团队常常在不经意间修补这些问题的过程中,实际上是在重新构建一个更复杂的系统。