开发了一个将Microsoft SQL Server变更流式传输到Apache Kafka的工具
摘要
Athena是一个轻量级变更数据捕获(CDC)工具,用于将Microsoft SQL Server的变更流式传输到Apache Kafka。它使用Golang构建,设置比Debezium更简单,并能自动管理CDC配置。
查看缓存全文
缓存时间: 2026/06/02 21:34
Niyko/Athena 源代码:https://github.com/Niyko/Athena Athena 徽标
Athena 是一款轻量级变更数据捕获(CDC)解决方案,用于将 Microsoft SQL Server 中的变更流式传输到 Apache Kafka。它使用 Golang 构建,支持 SASL 认证的 Kafka 代理,并提供简单的设置体验。与 Debezium 等替代方案(配置管理复杂)不同,Athena 更加简单且易于操作。它自动管理 CDC 设置,将数据库变更发布到单个 Kafka 主题,并提供清晰直观的事件格式,便于下游消费者理解和处理。
:zap: 工作原理
- 为 MSSQL 数据库表中行的变更(如
create、update、delete)创建消息,并发送到单个 Kafka 主题。 - Athena 仅为所有新表的变更创建消息。现有变更将被忽略。
- Kafka 主题必须事先创建。与 Debezium 不同,Athena 不会自动创建主题。
- 运行
setup命令后,Athena 会自动完成 MSSQL 中所有 CDC 设置。 - 默认情况下,Athena 会轮询所有表的变更,你可以使用
config.json中的skippedTables选项来忽略某些表。
:cyclone: 简单安装
你可以从 GitHub Releases 页面(https://github.com/Niyko/Athena/releases)下载预编译的二进制文件,并将其复制到所需位置。然后按以下步骤操作:
在存放 Athena 二进制文件的根文件夹中创建一个 config.json 文件。以下是 JSON 文件的格式。请填写所有凭据。参数详情请参见下文中的表格。
{
"dbHost": "127.0.0.1",
"dbPort": 1433,
"dbUser": "",
"dbPassword": "",
"dbName": "",
"kafkaHost": "",
"kafkaEnableTLS": false,
"kafkaTopic": "",
"kafkaSASLMechanisms": "NONE",
"kafkaSASLUsername": "",
"kafkaSASLPassword": "",
"pollInterval": 10,
"fetchLimit": 50,
"skippedTables": [],
// 如果想在 ClickHouse 中收集日志
"clickHouse": false,
"clickHouseHost": ":",
"clickHouseUsername": "",
"clickHousePassword": "",
"clickHouseDatabase": "",
"clickHouseTableName": "",
"clickHouseTableTTL": 12
}
在 Kafka 代理中创建一个主题,名称与 config.json 中给定的相同(不包含 scheme),并根据需要设置分区。
运行 setup 命令,以便在数据库中创建 CDC 及其他必需设置。
./athena setup
设置一个服务,让 Athena 在后台运行。Windows 和 Linux 的设置方法不同。以下是在 Linux 发行版上创建服务的步骤。
使用以下命令在 /etc/systemd/system 目录中创建一个名为 athena_mssql_kafka.service 的服务文件。
cd /etc/systemd/system
nano athena_mssql_kafka.service
将以下内容复制粘贴到上面创建的服务文件 athena_mssql_kafka.service 中。
[Unit]
Description=Athena MSSQL Kafka Service
After=network.target
[Service]
Type=simple
ExecStart=athena run
[Install]
WantedBy=multi-user.target
请注意,创建服务文件时需要更改
ExecStart中的路径。
现在你可以启动服务并检查服务状态。
systemctl start athena_mmsql_kafka.service
systemctl status athena_mmsql_kafka.service
:gear: 配置 Athena
Athena 可以通过在 Athena 二进制文件根目录中创建的 config.json 文件进行配置。以下表格详细说明了配置键及其作用。
| 选项 | 描述 | 示例 |
|---|---|---|
dbHost | MSSQL 数据库主机 | 127.0.0.1 |
dbPort | MSSQL 数据库端口 | 1433 |
dbUser | MSSQL 数据库用户名 | |
dbPassword | MSSQL 数据库密码 | |
dbName | MSSQL 数据库名称 | |
kafkaHost | Kafka 服务器主机及端口 | 127.0.0.1:9092 |
kafkaTopic | 你为表变更创建的 Kafka 主题 | |
kafkaEnableTLS | 启用 Kafka 连接的 TLS | true, false |
kafkaSASLMechanisms | 用于 Kafka 连接的 SASL 机制 | NONE, SASL-PLAIN, SASL-SCRAM-SHA-256, SASL-SCRAM-SHA-512 |
kafkaSASLUsername | Kafka 服务器的 SASL 用户名 | |
kafkaSASLPassword | Kafka 服务器的 SASL 密码 | |
pollInterval | 下次轮询数据库的时间间隔(秒) | 10 |
fetchLimit | 每次从表中拉取的 CDC 变更行数 | 50 |
skippedTables | 需要跳过的表名数组(不处理这些表的 CDC 变更) | [“table1”, “table2”] |
clickHouse | 启用 ClickHouse 日志。运行 setup 命令时,Athena 会自动创建 ClickHouse 表和结构 | true, false |
clickHouseHost | ClickHouse 服务器主机及端口 | 127.0.0.1:8123 |
clickHouseUsername | ClickHouse 服务器用户名 | |
clickHousePassword | ClickHouse 服务器密码 | |
clickHouseDatabase | ClickHouse 服务器数据库名称 | |
clickHouseTableName | ClickHouse 服务器表名称 | |
clickHouseTableTTL | 每条记录的生存时间(小时) | 24 |
:mushroom: Athena 辅助选项
Athena 可执行文件除了 setup 和 run 外,还提供其他辅助功能,如下所述。可以通过以下方式运行,例如:./athena uninstall
| 选项 | 描述 |
|---|---|
uninstall | 禁用 MSSQL 中的 CDC 并删除 SQLite 数据库 |
add-cdc | 在 MSSQL 数据库中运行 CDC 设置 |
remove-cdc | 禁用 MSSQL 数据库中的 CDC |
clear-cdc-history | 清除 MSSQL 数据库中 Athena 尚未处理的 CDC 历史或变更 |
recreate-clickhouse | 重新运行 ClickHouse 迁移 |
recreate-sqlite | 重新创建 SQLite 数据库并重新运行迁移 |
help | 查看所有可用选项 |
:triangular_ruler: 开发
要设置开发环境,dev 文件夹中有一个 Docker 文件。它将创建所有必要的服务,如包含示例数据库的 MSSQL、Kafka 等。同样的环境也可用于运行集成测试。
- 从 Go 官网(https://go.dev/doc/install)安装最新版本的 Go。
- 从 GitHub 克隆该项目。
- 运行
go mod download命令安装所有模块。 - 然后根据需要运行以下命令。
cd dev
docker compose up -d
cd ..
set GORUN=true # 用于标识脚本是从 go run 命令运行的,以便为读取 config.json 或 db.sqlite 选择正确路径
go run . setup
go run . run
请注意,
dev文件夹中的docker-compose.yml仅用于开发目的。
:cactus: 如何运行测试
在运行测试之前,请确保已设置好开发环境,并且 config.json 配置正确。
cd tests
go test -v -run TestIntegration
:hammer_and_wrench: 如何构建
你可以按照以下步骤构建二进制文件或进行 Athena 的开发。Athena 完全使用 Golang 构建。因此,你需要从 Go 官网(https://go.dev/doc/install)安装最新版本的 Go。请注意,二进制文件的构建由 Goreleaser(https://goreleaser.com/)管理。
- 从 GitHub 克隆该项目。
- 运行
go mod download命令安装所有模块。 - 运行
SET GORUN=true命令设置 gorun 变量。 - 运行
goreleaser release --snapshot --clean命令构建二进制文件。
:page_with_curl: 许可证
Athena 采用 MIT 许可证(https://github.com/Niyko/Athena/blob/main/LICENSE)。
相似文章
Show HN: Streambed – 将Postgres流式传输到S3上的Iceberg,支持Postgres Wire协议
Streambed是一个开源的CDC引擎,它将Postgres的WAL变更流式传输到S3上的Iceberg表,并内置了一个使用DuckDB的查询服务器,该服务器支持Postgres wire协议。
SQLite:持久化工作流的全部所需
这篇博文认为,SQLite 结合 Litestream 进行异步备份,为许多工作流系统(尤其是 AI 智能体)提供了一种简单而有效的持久化执行方法,无需单独编排层或网络数据库。
Basedash MCP 服务器
Basedash 是一款新的 MCP 服务器,可作为各种 AI 工具的数据分析集成服务。
@makisuo: 现在所有类型的数据库(Redis、Postgres、Clickhouse、MySQL 等)都会显示重要的统计数据和指标,以及大多数…
Maple 服务地图现在显示包括 Redis、Postgres、Clickhouse 和 MySQL 在内的各种数据库的重要统计数据和指标。
pg_deltax: Apache许可的PostgreSQL时序扩展
DeltaX 是一个基于Apache许可的PostgreSQL扩展,为时序数据提供压缩和列式存储,是TimescaleDB或ClickHouse的快速替代方案,同时将数据保留在PostgreSQL中。