@wsl8297: 做 RAG / 数据智能体,最容易卡住的是这一步:怎么把一堆散落的文件,变成可追踪、可查询、可复用的数据集。 尤其是 S3 / GCS / Azure 里的 PDF、图片、日志、标注文件,规模一上来,管理和迭代就开始失控。 https:/…
摘要
DataChain 是一个 Python 库,为 S3、GCS 和 Azure 中的非结构化文件添加上下文层,将其转化为可版本化、可查询的带类型数据集,支持并行处理、增量更新和 Agent 工作流集成。
查看缓存全文
缓存时间: 2026/06/02 17:35
做 RAG / 数据智能体,最容易卡住的是这一步:怎么把一堆散落的文件,变成可追踪、可查询、可复用的数据集。
尤其是 S3 / GCS / Azure 里的 PDF、图片、日志、标注文件,规模一上来,管理和迭代就开始失控。
https://github.com/datachain-ai/datachain…
最近看到一个 Python 库 DataChain,思路很清晰:给非结构化数据加一层 context layer,把云存储里的文件资产“数据集化”——变成带类型、可版本化的 datasets。
之后就能像用数据仓库一样做过滤、join、相似度检索,并且可以在 Agent / RAG 流程里直接复用。
主要特性:
- 覆盖 S3、GCS、Azure 等对象存储的文件数据;
- 用 Pydantic schema 管结构化字段,同时保留文件指针与 lineage;
- 支持并行 / 分布式 Python 处理、checkpoint、增量更新;
- 可把数据集导出为 Markdown knowledge base,方便人和 LLM 消化;
- 提供 MCP / agent harness,便于接入 Claude Code、Cursor、Codex、Copilot 等工具链。
如果你在做多模态数据集、企业内部知识库、RAG 评测集或数据清洗流水线,这个值得看一眼。
datachain-ai/datachain
Source: https://github.com/datachain-ai/datachain
DataChain: The Context Layer for Unstructured Data
A Python library that turns files in S3, GCS, and Azure into versioned, typed datasets, queryable at warehouse speed.
- Compute Engine: parallel and distributed Python over files. Async I/O, checkpoint recovery, incremental updates.
- Dataset DB: Pydantic schemas, versioning, file pointers, automatic lineage. Sub-second filter, join, and similarity search over hundreds of millions of records.
Optional, for agent workflows:
- Knowledge Base: markdown summaries derived from the Dataset DB and enriched by LLM. Readable by humans and LLMs.
- Agent Harness: skill and MCP server that plug all three into Claude Code, Cursor, Codex, GitHub Copilot, and Pi, so they understand your data.
Bytes never leave your storage. Every run deposits a typed dataset the next pipeline (or agent) reads instead of recomputing.
1. Install
pip install datachain
To add the agent skill (Knowledge Base + code generation):
datachain skill install --target claude # also: cursor, codex, copilot, pi
Works with S3, GCS, Azure, and local filesystems.
2. Quickstart: agent-driven pipeline
Task: find dogs in S3 similar to a reference image, filtered by breed, mask availability, and image dimensions.
Grab a reference image and run Claude Code (or other agent):
datachain cp --anon s3://dc-readme/fiona.jpg .
claude
Prompt:
Find dogs in s3://dc-readme/oxford-pets-micro/ similar to ./fiona.jpg:
- Pull breed metadata and mask files from annotations/
- Exclude images without mask
- Exclude Cocker Spaniels
- Only include images wider than 400px
Result:
┌──────┬───────────────────────────────────┬────────────────────────────┬──────────┐
│ Rank │ Image │ Breed │ Distance │
├──────┼───────────────────────────────────┼────────────────────────────┼──────────┤
│ 1 │ shiba_inu_52.jpg │ shiba_inu │ 0.244 │
├──────┼───────────────────────────────────┼────────────────────────────┼──────────┤
│ 2 │ shiba_inu_53.jpg │ shiba_inu │ 0.323 │
├──────┼───────────────────────────────────┼────────────────────────────┼──────────┤
│ 3 │ great_pyrenees_17.jpg │ great_pyrenees │ 0.325 │
└──────┴───────────────────────────────────┴────────────────────────────┴──────────┘
Fiona's closest matches are shiba inus (both top spots), which makes sense given her
tan coloring and pointed ears.
The agent decomposed the task into steps - embeddings, breed metadata, mask join, quality filter - and saved each as a named, versioned dataset. Next time you ask a related question, it starts from what’s already built.
The datasets are registered in a Knowledge Base optimized for both agents and humans:
dc-knowledge
├── buckets
│ └── s3
│ └── dc_readme.md
├── datasets
│ ├── oxford_micro_dog_breeds.md
│ ├── oxford_micro_dog_embeddings.md
│ └── similar_to_fiona.md
└── index.md
Browse it as markdown files, navigate with wikilinks, or open in Obsidian:

3. Data Harness
Code harnesses (Claude Code, Cursor, Codex, GitHub Copilot, Pi) give agents repo context, dedicated tools, and memory across sessions. DataChain adds the same for data: typed datasets the agent reads, chain operations the agent calls (read_storage, map, save), a Dataset DB where its results persist.
A dataset is the unit of work - a named, versioned result of a pipeline step like [email protected]. Every .save() registers one.
For the data-flow architecture (Compute Engine, Dataset DB, Knowledge Base) and how the components connect, see Architecture.
4. Core concepts
4.1. Dataset
A dataset is a versioned data reasoning step - what was computed, from what input, producing what schema. DataChain indexes your storage into one: no data copied, just typed metadata and file pointers. Re-runs only process new or changed files.
Create a dataset manually create_dataset.py:
from PIL import Image
import io
from pydantic import BaseModel
import datachain as dc
class ImageInfo(BaseModel):
width: int
height: int
def get_info(file: dc.File) -> ImageInfo:
img = Image.open(io.BytesIO(file.read()))
return ImageInfo(width=img.width, height=img.height)
ds = (
dc.read_storage(
"s3://dc-readme/oxford-pets-micro/images/**/*.jpg",
anon=True,
update=True,
delta=True, # re-runs skip unchanged files
)
.settings(prefetch=64)
.map(info=get_info)
.save("pets_images")
)
ds.show(5)
[email protected] is now the shared reference to this data - schema, version, lineage, and metadata.
Every .save() registers the dataset in the Dataset DB, DataChain’s persistent store for schemas, versions, lineage, and processing state, kept locally in SQLite DB .datachain/db. Pipelines reference datasets by name, not paths. When the code or input data changes, the next run bumps dataset version.
This is what makes a dataset a management unit: owned, versioned, and queryable by everyone on the team.
4.2. Schemas and types
DataChain uses Pydantic to define the shape of every column. The return type of your UDF becomes the dataset schema - each field a queryable column in the Dataset DB.
show() in the previous script renders nested fields as dotted columns:
file file info info
path size width height
0 oxford-pets-micro/images/Abyssinian_141.jpg 111270 461 500
1 oxford-pets-micro/images/Abyssinian_157.jpg 139948 500 375
2 oxford-pets-micro/images/Abyssinian_175.jpg 31265 600 234
3 oxford-pets-micro/images/Abyssinian_220.jpg 10687 300 225
4 oxford-pets-micro/images/Abyssinian_3.jpg 61533 600 869
[Limited by 5 rows]
.print_schema() renders it’s schema:
file: File@v1
source: str
path: str
size: int
version: str
etag: str
is_latest: bool
last_modified: datetime
location: Union[dict, list[dict], NoneType]
info: ImageInfo
width: int
height: int
Models can be arbitrarily nested - a BBox inside an Annotation, a List[Citation] inside an LLM Response - every leaf field stays queryable the same way. The schema lives in the Dataset DB and is enforced at dataset creation time.
The Dataset DB handles datasets of any size - 100 millions of files, hundreds of metadata rows - without loading anything into memory. Pandas is limited by RAM; DataChain is not. Export to pandas when you need it, on a filtered subset:
import datachain as dc
df = dc.read_dataset("pets_images").filter(dc.C("info.width") > 500).to_pandas()
print(df)
4.3. Fast queries
Filters, aggregations, and joins run as vectorized operations directly against the Dataset DB - metadata never leaves your machine, no files downloaded.
import datachain as dc
cnt = (
dc.read_dataset("pets_images")
.filter(
(dc.C("info.width") > 400) &
~dc.C("file.path").ilike("%cocker_spaniel%") # case-insensitive
)
.count()
)
print(f"Large images with Cocker Spaniel: {cnt}")
Milliseconds, even at 100M-file scale.
Large images with Cocker Spaniel: 6
5. Resilient Pipelines
When computation is expensive, bugs and new data are both inevitable. DataChain tracks processing state in the Dataset DB - so crashes and new data are handled automatically, without changing how you write pipelines.
5.1. Data checkpoints
Save to embed.py:
import open_clip, torch, io
from PIL import Image
import datachain as dc
model, _, preprocess = open_clip.create_model_and_transforms("ViT-B-32", "laion2b_s34b_b79k")
model.eval()
counter = 0
def encode(file: dc.File, model, preprocess) -> list[float]:
global counter
counter += 1
if counter > 236: # ← bug: remove these two lines
raise Exception("some bug") # ←
img = Image.open(io.BytesIO(file.read())).convert("RGB")
with torch.no_grad():
return model.encode_image(preprocess(img).unsqueeze(0))[0].tolist()
(
dc.read_dataset("pets_images")
.settings(batch_size=100)
.setup(model=lambda: model, preprocess=lambda: preprocess)
.map(emb=encode)
.save("pets_embeddings")
)
It fails due to a bug in the code:
Exception: some bug
Remove the two marked lines and re-run - DataChain resumes from image 201 (two 100 size batches are completed), the start of the last uncommitted batch:
$ python embed.py
UDF 'encode': Continuing from checkpoint
5.2. Similarity search
The vectors live in the Dataset DB alongside all the metadata - list[float] type in pydentic schemas. Querying them is instant - no files re-read and can be combined with not vector filters like info.width:
Prepare data:
datachain cp s3://dc-readme/fiona.jpg .
similar.py:
import open_clip, torch, io
from PIL import Image
import datachain as dc
model, _, preprocess = open_clip.create_model_and_transforms("ViT-B-32", "laion2b_s34b_b79k")
model.eval()
ref_emb = model.encode_image(
preprocess(Image.open("fiona.jpg")).unsqueeze(0)
)[0].tolist()
(
dc.read_dataset("pets_embeddings")
.filter(dc.C("info.width") > 500) # from pets_images - no re-read
.mutate(dist=dc.func.cosine_distance(dc.C("emb"), ref_emb))
.order_by("dist")
.limit(3)
.show()
)
Under a second - everything runs against the Dataset DB.
5.3. Incremental updates
The bucket in this walkthrough is static, so there’s nothing new to process. But in production - when new images land in your bucket - re-run the same scripts unchanged. delta=True in the original dataset ensures only new files are processed end to end while the whole dataset will be updated to [email protected]:
$ python create_dataset.py # 500 new images arrived
Skipping 10,000 unchanged · indexing 500 new
Saved [email protected] (+500 records)
# Next day:
$ python create_dataset.py
Skipping 10,000 unchanged · processing 500 new
Saved [email protected] (+500 records)
6. Knowledge Base
DataChain maintains two layers. The Dataset DB is the ground truth: schemas, processing state, lineage, the vectors themselves. The Knowledge Base is derived from it: structured markdown for humans and agents to read. Because it’s derived, it’s always accurate. The Knowledge Base is stored in dc-knowledge/.
Ask the agent to build it (from Claude Code, Cursor, Codex, GitHub Copilot, or Pi):
claude
Prompt:
Build a Knowledge Base for my current datasets
The skill generates dc-knowledge/ directory from the Dataset DB - one file per dataset and bucket:
7. AI-Generated Pipelines
The skill gives the agent data awareness: it reads dc-knowledge/ to understand what datasets exist, their schemas, which fields can be joined - and the meaning of columns inferred from the code that produced them.
See section 2. Quickstart: agent-driven pipeline above. All the steps that were manually created could be just generated.
8. Team and cloud: Studio
Data context built locally stays local. DataChain Studio makes it shared.
datachain auth login
datachain job run --workers 20 --cluster gpu-pool caption.py
# ✓ Job submitted → studio.datachain.ai/jobs/1042
# Resuming from checkpoint (4,218 already done)...
# Saved [email protected] (3,182 processed)
Studio adds: shared dataset registry, access control, UI for video/DICOM/NIfTI/point clouds, lineage graphs, reproducible runs.
Bring Your Own Cloud - all data and compute stay in your infrastructure. AWS, GCP, Azure, on-prem Kubernetes.
9. Contributing
Contributions are very welcome. To learn more, see the Contributor Guide.
10. Community and Support
- Report an issue if you encounter any problems
- Docs
相似文章
@GitHub_Daily: 用 AI 智能体生产级事情,写代码、跑流程、调接口,一开始还行,但规模一大就容易失控,权限太宽、上下文丢失、调试无从下手。 于是找到了 agents-best-practices 这套完整的智能体运行框架设计指南,不限于编码场景,运营、销…
介绍了 agents-best-practices 仓库,这是一份生产级 AI 智能体运行框架设计指南,涵盖工具权限分级、上下文压缩等,支持 Codex 和 Claude Code 安装。
@DashHuang: https://x.com/DashHuang/status/2057323152758480955
这篇文章探讨了为什么在AI agent时代,GitHub比传统文档系统更适合作知识协作的基础,因为它具有开放协作、AI模型熟悉、本地完整上下文和结构化原始数据等优势。
@AIExplorerTim: 有人刚刚开发了一个工具,可以将 PDF 转换为 干净、结构化的 Markdown 速度达到 100 页/秒 不需要 GPU。 不需要 API 成本。 没有混乱的解析。 只有原始的、可用的数据。 它可以轻松处理的内容: • 表格 → 完美提…
OpenDataLoader 是一个开源工具,可将 PDF 转换为结构化的 Markdown 和 JSON,支持 100 页/秒的本地处理速度,无需 GPU 或 API 成本,专为 RAG 管道和 PDF 无障碍自动化设计。
@rwayne: 太屌了学术论文搭本地知识库,瓶颈一直在 PDF 怎么干净转 md。OpenDataLoader-PDF 把这道题做到了 0.907 准确率 开源 PDF 解析榜第一全套 Apache 2.0。 200 篇真实论文测试集的关键数字 总分 0…
OpenDataLoader-PDF 是一款开源 PDF 解析工具,在真实学术论文测试中达到 0.907 的高准确率,支持将复杂的 PDF 文档(含表格、公式、扫描件)高效转换为 Markdown 和 JSON,非常适合本地知识库和 RAG 应用。
@gkxspace: 发现一个很疯狂的开源工具,你输一句话描述你要什么数据,它派出一群 AI Agent 并行跑到各个网站上调研,几分钟后汇总成一张结构化表格给你 其实数据都摆在网上,但想变成一张能用的表格,历来都是苦力活,过去这是一个工程项目: 拼搜索、写爬…
BigSet 是一个开源工具,输入一句话描述所需数据,它会派出多个 AI Agent 并行在网络上调研,自动推断 schema、去重、验证并生成结构化表格,支持定时刷新。