使用 Python 和 Apache Spark 实现自定义查询语言
摘要
本技术指南介绍了如何使用 Python 和 Apache Spark 实现自定义查询语言(EHQL),重点在于使用 Lark 定义语法和解析。
<p>关于设计该语言的上一篇帖子:<a href="https://lobste.rs/s/c5tybg/designing_custom_query_language_for_non" rel="ugc">https://lobste.rs/s/c5tybg/designing_custom_query_language_for_non</a></p>
<p><a href="https://lobste.rs/s/ucipie/implementing_custom_query_language_with">评论</a></p>
查看缓存全文
缓存时间: 2026/06/23 13:46
# 使用 Python 和 Apache Spark 实现自定义查询语言
来源:https://nchammas.com/writing/custom-query-language-implementation
在上一篇文章(https://nchammas.com/writing/custom-query-language-design)中,我详细介绍了如何为一批半技术/非技术人员分析师设计一门名为实体历史查询语言(EHQL)的新查询语言,用于处理车辆维护数据。本文将展示如何使用 Python 和 Apache Spark 实现该语言的一个可运行骨架。
实现查询语言有多种方法。我选择 Python 是因为我喜欢用它,而且它在数据处理领域非常流行。同时我事先就知道,我的自定义语言最终需要运行在 Apache Spark 之上——因为我每天都用它,并且我感兴趣的查询数据已经以 Parquet 格式存储并注册在中央目录中。这一背景极大地缩小了可行的实现方案范围,这是一件好事!我不需要(也不想)设计自定义存储格式或查询引擎——两者都是极其庞大的工程。我只需要将我的自定义语言编译成 Spark 可以执行的查询。
如果你正在实现自己的语言,大致需要三个步骤:
1. 定义语法,通常采用某种 EBNF(https://en.wikipedia.org/wiki/Extended_Backus%E2%80%93Naur_form)形式。
2. 使用该语法从源文本构建解析树。
3. 将解析树转换为程序最终应采用的任意形式。
EHQL 有许多有趣的特性(https://nchammas.com/writing/custom-query-language-design#designing-the-syntax),但逐一实现它们篇幅过长。因此,我将重点放在以下示例查询上,并以此为基础实现该语言的骨架——尽管如此,它仍能展示所涉及的关键技术和方法。
``
-- This is an example EHQL query.
history contains:
"oil change"
"transmission fluid change"
``
该查询用于查找数据库中所有在其维护历史中同时进行过换油和变速箱油更换的车辆。
这里有一个自包含的解决方案(https://nchammas.com/assets/query-language-implementation/code/ehql.py),你可以在本地运行,跟随我一起搭建实现。
## 定义语法
如果你使用 Python,定义语法的合适库是 Lark(https://github.com/lark-parser/lark)。¹(https://nchammas.com/writing/custom-query-language-implementation#fn:standard)我认为在 JVM 生态中,大多数人使用 ANTLR(https://www.antlr.org/)。ANTLR 确实有 Python 实现,但快速调查两个解析库后,我发现 Lark 更快、更流行、功能更丰富(https://github.com/lark-parser/lark/tree/eb015d1b5c236b22bf3215fd4a76c891262e1bf8#comparison-to-other-libraries)。虽然我本人没有用过 ANTLR 进行比较,但我可以说 Lark 用起来很愉快,其设计既不晦涩也不具约束性。
以下是本文中演示的 EHQL 最小骨架的 Lark 语法:
``
# ehql.lark
%import common.ESCAPED_STRING -> QUOTED_STRING
%import common.WS_INLINE
%import common.SQL_COMMENT
%ignore WS_INLINE
%ignore SQL_COMMENT
%declare _INDENT _DEDENT
?start: _NEWLINE* history_clause
history_clause: "history" "contains" ":" _history_body
_history_body: _NEWLINE _INDENT [history_pattern _NEWLINE]+ _DEDENT
history_pattern: event_name
event_name: QUOTED_STRING
_NEWLINE: (/\r?\n[\t ]*/ | SQL_COMMENT)+
``
Lark 语法以 EBNF(https://en.wikipedia.org/wiki/Extended_Backus%E2%80%93Naur_form)表示,并添加了一些 Lark 特定的特性。例如,顶部的 `%import` 语句(https://lark-parser.readthedocs.io/en/stable/grammar.html#import)导入 Lark 预定义的规则,以便我们在自己的语法中使用。导入规则时还可以重命名,`ESCAPED_STRING -> QUOTED_STRING` 就做了这件事。`start` 是 Lark 默认寻找的语法根规则,但你也可以重命名它。`?start` 中的前导 `?`(https://lark-parser.readthedocs.io/en/stable/tree_construction.html#conditionally-inlining-rules-with)告诉 Lark:如果该规则只有一个子节点,则将其从解析树中排除——实际情况正是如此。
按照惯例,大写字母开头的规则通常是*终结符*,即那些不再继续分解为更多规则的规则。换句话说,终结符匹配一些文本后便停止。
- `QUOTED_STRING` 匹配双引号括起来的字符串,如 `"hello"`。
- `WS_INLINE` 匹配行内空白(如空格和制表符),但不匹配换行符。
- `SQL_COMMENT` 匹配 SQL 风格的注释,如 `-- this is a comment`。
两个 `%ignore` 语句告诉 Lark 忽略匹配这些终结符的文本。这基本上就是我们实现注释支持以及对琐碎空白差异的鲁棒性的方式。例如,`history contains:` 和 `history contains :` 由于 `%ignore WS_INLINE` 的存在,会得到相同的解析结果。
### 处理有意义的缩进
EHQL 像 Python 一样,使用缩进(https://nchammas.com/writing/custom-query-language-design#significant-indentation)来逻辑分组语句。语法中通过以下语句实现:
``
%declare _INDENT _DEDENT
history_clause: "history" "contains" ":" _history_body
_history_body: _NEWLINE _INDENT [history_pattern _NEWLINE]+ _DEDENT
_NEWLINE: (/\r?\n[\t ]*/ | SQL_COMMENT)+
``
关键结构由 `_history_body` 捕获,它包含一个或多个历史模式,每个模式由换行符分隔。所有模式相对于父级 `history_clause` 缩进一级。`_NEWLINE` 终结符比较有趣,因为它同时匹配换行符后的空白以及注释;这对于 Lark 正确(https://lark-parser.readthedocs.io/en/stable/examples/indented_tree.html)处理缩进和行内注释至关重要。另外有趣的还有 `_INDENT` 和 `_DEDENT` 终结符。它们在语法中没有定义,但被声明并用于 `_history_body`。这是 Lark 处理具有有意义缩进的语言的另一个结果,具体来说。
为什么 `_NEWLINE`、`_INDENT` 和 `_DEDENT` 的定义如此奇怪?大致原因如下:将文本解析为结构化语法树通常分为两个阶段:*词法分析*和*解析*。词法分析将原始字符扫描并转换为*记号*,这些记号将由语法中的终结符(即大写字母开头的规则)匹配。解析则接受生成的记号,并根据语法规则将其组合成结构化树。
当词法分析阶段遇到换行符时,它需要生成的 `_INDENT` 或 `_DEDENT` 记号数量取决于换行符*之前*和*之后*的情况。本文不会实现 `any` 和 `all`(https://nchammas.com/writing/custom-query-language-design#against-boolean-operators)的支持,但考虑以下示例:
``
-- After each newline, does the lexer need to indent or dedent?
history contains: -- indent
any of: -- indent
"oil change" -- no indent or dedent
"transmission fluid change" -- dedent 2x
``
换句话说,词法分析器在遍历文本时需要维护缩进级别的状态,才能正确生成缩进和取消缩进记号。但词法分析器通常是无状态的,因此无法跟踪此类信息。我认为这样设计主要是为了保持简单和快速。因此,Lark 通过一个*后词法处理器*来解决这个问题,该处理器在词法分析之后、解析之前运行。后词法处理器*确实*跟踪一些状态,并生成我们需要的缩进和取消缩进记号。
Lark 为有意义的缩进提供了一个专用的后词法处理器。我们只需要子类化它,并告诉它语法中的哪些终结符对应于关键的换行、缩进和取消缩进记号。
``
from lark.indenter import Indenter
class EHQLIndenter(Indenter):
NL_type = '_NEWLINE'
OPEN_PAREN_types = []
CLOSE_PAREN_types = []
INDENT_type = '_INDENT'
DEDENT_type = '_DEDENT'
tab_len = 2
``
如果 EHQL 支持使用括号将长表达式拆分成多行,我们还需要指定这些括号记号的名称。这是因为在括号内,无论每行的前导空白如何,缩进级别都必须冻结。由于 EHQL 不支持此功能,我们可以将 `EHQLIndenter` 的 `*_PAREN_types` 部分留空。
最后,`tab_len` 参数用于将制表符 `\t` 转换为适当的空格数,以确定缩进级别。请注意,它*并非*告诉 Lark 多少个空格对应一级缩进——一开始我对此感到困惑。给定行的缩进级别由后词法处理器根据该行相对于前一行的前导空格数自动确定。没有全局规则将固定数量的空格转换为特定缩进级别。这意味着不同的代码块可以用不同数量的前导空格编写,但解析为相同的缩进级别,就像 Python(https://docs.python.org/3/reference/lexical_analysis.html#indentation)一样。实践中,EHQL 的惯例是每个缩进级别使用两个空格,而 Python 的惯例是四个(https://peps.python.org/pep-0008/#indentation)。
## 构建解析树
好了!我们有了语法,也有了用于处理 EHQL 缩进的后词法处理器的适当指令。现在我们可以将它们组合起来,将示例 EHQL 查询解析为解析树。
``
from lark import Lark
from pathlib import Path
from textwrap import dedent
grammar = Path("ehql.lark").read_text()
parser = Lark(grammar, parser='lalr', postlex=EHQLIndenter())
query = dedent(
"""
-- This is an example EHQL query.
history contains:
"oil change"
"transmission fluid change"
"""
)
parse_tree = parser.parse(query)
print(parse_tree.pretty())
``
首先简要说明 `parser='lalr'`。Lark 支持几种解析算法(https://lark-parser.readthedocs.io/en/stable/parsers.html),你可以从中选择用于解析语法。主要两种是 Earley 和 LALR(1)。Earley 较慢,但在能处理的语法方面更灵活。然而,LALR(1) 足够强大,可以解析像 Python 和 Java 这样的“真实”语言。因此,除非你在做一些特殊的事情,否则我会从 LALR(1) 开始,必要时才切换到其他算法。另外请注意,由于我们的语言有有意义的缩进,我们必须将缩进器规范传递给 `postlex` 参数。
运行上述代码将解析我们的示例查询,并打印以下解析树:
``
history_clause
history_pattern
event_name "oil change"
history_pattern
event_name "transmission fluid change"
``
这相当简洁!我们可以看到树中对应于语法规则的节点。由于 `%ignore` 指令,注释和行内空白已被正确忽略。换行符和缩进记号已被正确转换为适当的结构,然后从解析树中移除。这一点很重要,因为我们不希望也不需要在树中看到 `_NEWLINE`、`_INDENT` 或 `_DEDENT` 等标记;我们只需要 `event_name` 是 `history_pattern` 的子节点,`history_pattern` 是 `history_clause` 的子节点,等等。这是 Lark 语法的一个关键细节:以下划线开头的规则被解析为适当的结构,但随后被丢弃,以免杂乱最终解析树。
## 将解析树转换为查询
至此,我们已准备好将这个解析树转换为 Apache Spark 可以执行的查询。Spark 具有广泛的能力,因此“执行查询”可以有许多不同形式。对于 EHQL,它意味着在 Spark Connect(https://spark.apache.org/docs/4.1.2/spark-connect-overview.html)上执行 DataFrame 查询(https://spark.apache.org/docs/4.1.2/sql-programming-guide.html)。
### 起草目标解决方案
在我关于设计 EHQL 的文章中,我梳理了一个 SQL 参考查询(https://nchammas.com/writing/custom-query-language-design#first-stop-sql)。Spark 可以很好地执行 SQL,但我更喜欢使用其 DataFrame API。主要原因是 DataFrame API 可以在宿主语言(在我们的案例中是 Python)中访问,因此你可以获得与任何库一样的 IDE 支持,而不必嵌入或笨拙地构建 SQL 字符串。DataFrame API 还提供了流畅接口(https://martinfowler.com/bliki/FluentInterface.html),使得链式操作自然流畅,类似于 Unix Shell 编程中的方式。²(https://nchammas.com/writing/custom-query-language-implementation#fn:pipe)Spark 将 SQL 和 DataFrame 查询都编译为相同的中间表示,并使用完全相同的优化器执行,因此使用不同接口在运行时没有差异。关键在于哪种接口更适合你的用例。
另一个关键决定是使用 Spark Connect(https://spark.apache.org/docs/4.1.2/spark-connect-overview.html)与 Spark 接口,而不是经典的 DataFrame API。DataFrame API 本身相同,但区别在于:以前你的客户端需要运行一个相当重的 JVM 来托管 Spark 应用的驱动程序;而使用 Spark Connect,你的客户端变成一个轻量级薄层,不了解 JVM 或 Spark 内部。它只需从 DataFrame 查询中构建逻辑查询计划,通过网络发送到服务器,然后取回结果。这使得使用 DataFrame API 更像使用 SQL,因为客户端与服务器完全解耦,可以嵌入几乎任何地方,甚至是最资源受限的主机。³(https://nchammas.com/writing/custom-query-language-implementation#fn:langs)最棒的是,使用 Spark Connect 只需要在初始连接 Spark 的方式上做一个小小的改变,你马上就会看到。
我们的 DataFrame API 查询是什么样子?移植上一篇文章中的原始 SQL(https://nchammas.com/writing/custom-query-language-design#first-stop-sql)相当直接。
``
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, bool_or
# `.remote()` 是使用 Spark Connect 创建会话的方式。
spark = SparkSession.builder.remote("local[*]").getOrCreate()
(
spark.table("maintenance_history")
.withColumns({
"has_oil_change": col("work_done") == "oil change",
"has_tx_fluid_change": col("work_done") == "transmission fluid change",
})
.groupBy("vehicle_id")
.agg(
bool_or("has_oil_change").alias("has_oil_change"),
bool_or("has_tx_fluid_change").alias("has_tx_fluid_change"),
)
.where(
col("has_oil_change")
& col("has_tx_fluid_change")
)
)
``
这大致就是我们想要将上述解析树转换成的样子。
### 选择合适的 Lark 访问器
Lark 提供几种不同的方式(https://lark-parser.readthedocs.io/en/latest/visitors.html)将解析树转换为最终形式。这些方式在两个维度上有所不同:
- 它们从上到下处理树 vs. 从下到上处理树。
- 它们重建树 vs. 原地编辑树。
我们的语言骨架是
相似文章
为非技术分析师设计自定义查询语言
作者详细介绍了为非技术分析师设计一种自定义查询语言的过程,用于过滤车辆维护数据,并概述了用户需求、数据模式以及具体用例。
一种受SQL启发、专为事件溯源设计的查询语言(2025年)
EventQL是一种受SQL启发、专为事件溯源设计的查询语言,它提供对事件属性、主题层级结构的一流支持,并具备类似SQL的表达能力,以便高效查询事件流。
将 Python 转译为 Lisp
LispE 是 NAVER 推出的一款开源 Lisp 方言,兼具函数式与数组编程特性,并支持 PyTorch、llama.cpp 以及 MLX 等 AI 库。该语言既可作为原生应用运行,也可打包为支持多线程与现代函数式编程特性的 WebAssembly 库。
DeSQ: 基于分解的SPARQL查询生成
DeSQ是一个基于分解的框架,用于从自然语言问题生成SPARQL查询。它将复杂问题分解为原子约束,将它们映射到SPARQL片段,并组装成完整查询,在五个基准测试中的四个上优于现有技术。
ProSPy:面向企业文本到SQL的剖析驱动的SQL-Python智能体框架
ProSPy 是一个面向企业文本到 SQL 的剖析驱动型 SQL-Python 智能体框架,将推理过程结构化分为四个阶段:自动剖析、模式剪枝、方言无关的 SQL 接口以及基于 Python 的分析。在使用 Claude-4.5-Opus 模型时,它在 Spider 2.0-Lite 和 Spider 2.0-Snow 数据集上分别达到了 60.15% 和 60.51% 的执行准确率,优于多个强基线模型。