Apache Airflow:数据管道和任务编排的利器
Apache Airflow 简介
Apache Airflow 是开源的工作流管理平台,用于编程式地创建和调度工作流,同时通过内置的 Web 界面监控工作流。 于 2014 年由 Airbnb 开源, 2016 年由 Apache 孵化,并在 2019 年成为 Apache 顶级项目。
Airflow 由 Python 编写,同时其管理的工作流也由 Python 脚本组成。Airflow 的设计原则之一是“配置即代码”, 通过 Python 脚本配置工作流,使得用户在编写工作流时,可以使用 Python 编写更灵活的逻辑。
Apache Airflow 架构
Airflow 一般由以下组件组成。
- Scheduler:用于不断轮询任务,并将可执行的任务提交给 Executor;
- Executor:用于跟进任务的执行情况(并非真正执行任务),通过使用 LocalExecutor、CeleryExecutor 等方式可以将任务 分发到不同的 Worker 上执行,同时任务的超时控制、失败重试等也由 Executor 负责;
- Webserver:用于提供用户友好的任务检视、触发和调试界面 / API;
- DAG folder:用于保存使用 Python 脚本编写的任务配置文件,提供给 Scheduler 和 Executor 读取并解析;
- Metadata database:用于存储任务元信息。
默认安装的情况下,Scheduler 在调度任务的同时,也会同时执行任务,以下是 Airflow 官方文档的架构图。
DAG folder 文件夹中实际定义了许多工作流,其具体的体现方式为 DAG(有向无环图),DAG 中的每一个节点就是任务, 以下官方文档提供的 DAG 示例图。
Apache Airflow 集群
Airflow 本身并没有集群的设计,但是通过 CeleryExecutor、KubernetesExecutor 等方式,可以让 Airflow 有一定的主从集群效果, 由 Scheduler 调度任务,Executor 将任务分发给自身对应的 Worker。
至于集群本身的任务分发方式,则完全依赖 Executor 指定的组件,Airflow 本身使用的是组件的 High-level API,并不关心任务如何分发。
工作流的分类
Airflow 可同时支持重复任务、定时任务、一次性任务和实时任务,Airflow 控制工作流执行时间点的核心是时间表(Timetables), 其具体的表现形式可以是 cron 表达式,也可以是一个表达时间间隔的 Python 对象。
工作流管理
运行信息
用户可通过 Webserver 获取到任务的实时运行信息,同时,也可以通过在 DAG 中加入通知节点,将当前工作流的执行情况通知给用户。
日志
在工作流调度过程中,Airflow 会采集每次调度的审计日志,也会采集单个任务的执行日志。
工作流 CRUD
工作流以 Python 脚本文件的形式存储在文件系统(DAG folder),Airflow 默认会每分钟同步 DAG folder 中的内容, 所以对 DAG folder 中内容的修改会及时同步到 Metadata database 中,Scheduler 与 Worker 都能得到变更信息。但是, 当工作流配置发生变更时,如果没有及时做到 Scheduler 与 Worker 的代码同步,可能会发生系统内工作流状态不一致的问题。
同时 Airflow 官方也提供 REST API 两套接口对工作流进行管理,但是无法通过 REST API 注册新的工作流。
任务编排
作为工作流平台,任务编排对 Airflow 来说是核心的功能之一。Airflow 的任务编排主要通过前文所述的 DAG(有向无环图)实现, 对于处理同一个场景的不同任务,只需要编程式地置于 DAG 中,以此来表达他们之间地依赖关系,完成任务地编排。
注:所有任务都不共享运行的上下文,即使在同一个工作流,也可能运行在不同的机器上,也就是说任务间无法直接交换数据, 针对这个问题安管的解决方案是 XCom。
# 使用操作符定义依赖关系
first_task >> second_task >> [third_task, fourth_task]
# 使用方法调用定义依赖关系
first_task.set_downstream(second_task)
third_task.set_upstream(second_task)
在 Airflow 中,有以下 3 种基本的任务类型(模板)。
Operator
主动触发型的任务类型,在 Airflow 实现中,经常混用 Task 和 Operator 的概念。通过泛化 Operator,可以实现执行 Shell、邮件通知、SSH 、 S3 传输等任务,目前内置及社区维护的 Operator 已经超过 100 个。
Sensor
被动触发型的任务类型,是 Operator 的一种特殊类型,它被调度后会阻塞住工作流的执行,直到有一个外部事件唤醒它,它才会继续执行其之后的任务。
出于性能考虑,Airflow 会设定一个工作槽来保证系统的稳定性,工作槽代表着同一时间可同时运行的任务,但是 Sensor 被调度后会一直阻塞, 占据工作槽(Work Slot),导致其他任务饥饿。为了避免这种情况,Airflow 为 Sensor 设定了两种运行模式。
- poke :默认的模式,会一直占据工作槽;
- reschedule:只在检查时间点占据工作槽,两次检查之间会休眠,不占用工作槽。
TaskFlow
TaskFlow 是通过 Python 代码直接编写任务内的逻辑,比如访问 Slack 的 Webhook 发送一个消息,或者是获取当前任务运行环境的信息等。
调度模型
Airflow 是基于 Python 实现的,由于全局解释器锁的存在,Airflow 只能通过进程来调度工作流。
调度策略
Airflow 的调度策略几乎全由用户定义,因为 Scheduler 只把需要调度的工作流发送给配置的 Executor, 不同的 Executor 可能有着完全不同的调度策略。
监控与告警
Airflow 支持多种监控手段,以下是官方提供的日志与监控架构图。
Airflow 内置的告警手段只有 Email,可以将任务的异常信息发送到邮箱。如果要支持更丰富的告警, 则需要引入第三方的 Operator 或者自己编写 TaskFlow 实现。
认证和权限
Airflow 支持 RBAC 认证模式,但默认不启用。
数据存储
Airflow 不支持分库分表。
Airflow 的数据库中存在 44 表,这里只列举一些典型的。
DAG 元信息
- dag
- dag_code
- variable
前两张表基本是和 DAG folder 保持同步的,保存了工作流的元信息及调度信息,包括 DAG 代码文件信息。
variable 用于记录任务过程中使用到的变量信息,例如:SSH Key、SQL DSN 等等。
Task 元信息
- task_fail
- task_instance
- task_instance_note
- task_map
- task_outlet_dataset_reference
- task_reschedule
这些表记录了 Airflow 执行的任务元信息,如状态、执行结果、相关时间点、失败信息、参数等等。
调度信息
- dag_run
- job
- slot_pool
- xcom
前两张表存储了调度记录,job 是历史调度记录,dag_run 用于记录 DAG 当前的调度状态。
slot_pool 用于设定工作槽,可以在一个 Airflow 系统中设置多个工作槽。
xcom 表则是前文所述的 XCom 的底层表,用于在不同的 Task 之间传递数据。
举例
官方文档提供的一张任务生命周期图总结得很好。
总结
从大规模分布式任务调度系统的视角来看,Airflow 可参考的设计其实不多,Airflow 更擅长的是作为数据管道。Airflow 的分布式设计也无法应对百 万级的任务调度,任务调度的精确度也受到 Scheduler 设计的限制。
但是 Airflow 的任务编排还是具有参考意义的,DAG 这种数据结构非常适合流水线式的作业任务。同时 Sensor 这种被动触发的任务类型也很有参考 意义,但是在具体实现上要衡量性能和任务调度的精确性。