将 Apache Airflow 与 MinIO 结合使用
Apache Airflow是一个开源平台,用于以编程方式编写、安排和监控工作流。它最初由 Airbnb 的工程团队开发,但被提供给 Apache 软件基金会,并在 Apache 2.0 下获得许可。
Airflow 通常用于数据工程和数据科学管道,以自动执行任务,例如数据转换、加载和分析。它还用于其他行业,例如金融、医疗保健和电子商务,以实现业务流程的自动化。
Airflow 在连接方面非常灵活。这包括数据湖、数据仓库、数据库、API,当然还有对象存储。它在那些受益于数据管道即代码的用例中表现出色,例如:
自动化驱动的数据流
机器学习训练模型和再训练
备份和快照
Airflow 是用 Python 编写的,并使用有向无环图 (DAG)来表示工作流。DAG 中的每个节点代表一个任务,节点之间的边代表任务之间的依赖关系。DAG 不关心任务本身,只关心顺序、重试次数等。复杂的 DAG 可能变得脆弱且难以排除故障,尤其是当架构师必须管理数十个任务时。
Airflow 提供了一个 Web 界面来管理和监控工作流,以及一个 API 来创建、更新和删除工作流。它还具有一组丰富的功能,包括对调度、警报、测试和版本控制的支持。
MinIO 是 Airflow 的完美伴侣,因为它具有行业领先的性能和可扩展性,使每个数据密集型工作负载都触手可及。通过将 PB 级数据存储在 MinIO 存储桶中,您可以在 Airflow 中创建数据管道来处理大量数据,这对于 DAG 尽快运行至关重要。处理完成后,您甚至可以将最终结果存储回 MinIO 存储桶中,供其他工具使用。MinIO 具有惊人的性能——最近的基准测试在 GET 上达到了 325 GiB/s (349 GB/s),在 PUT 上达到了 165 GiB/s (177 GB/s),只有 32 个现成的 NVMe SSD 节点。
除此之外,Apache Airflow 还可以将其日志存储在 MinIO 存储桶中。这在云或容器编排环境中很有用,在这些环境中,本地文件系统是短暂的,如果机器终止或容器停止,日志可能会丢失。
为了最大限度地降低这些环境中数据丢失的风险,建议使用更持久的云原生存储解决方案(如 MinIO)来存储 PB 级数据和日志。这确保即使机器或容器终止,数据也能持久保存。此外,在网络和安全允许的情况下,可以从任何位置访问它们。
将 MinIO 与 Apache Airflow 结合使用有几个原因:
MinIO 使 Airflow DAG 以行业领先的性能快速运行,使其处理速度比任何其他数据存储都快,而成本仅为其一小部分。通过在 MinIO 中拥有 DAG 所需的所有数据,您可以显着节省数据存储成本,同时获得最佳的性能成本比。
MinIO 依靠擦除编码为您的数据提供高可用性和持久性,并且还具有可配置的数据保护策略,以确保即使在发生服务器故障或其他灾难时您的数据也不会丢失。
MinIO 可以将数据存储在多个区域并在它们之间复制,允许您将数据存储在靠近用户的位置或符合您的数据法规的区域。这可以减少延迟并提高工作流程的性能。
MinIO 与 Apache Airflow 无缝集成,允许您使用 S3 API 来存储和检索您的数据和其他日志。这使得设置和使用 MinIO 与 Airflow 变得容易,无需任何额外配置。MinIO 的云原生集成意味着它可以与最广泛实施的软件和框架顺利合作。
Apache Airflow 和 MinIO 教程
在本教程中,我们将向您展示 Airflow 与 MinIO 的多个用例。
首先,我们将向您展示如何将日志从 MinIO DAG 运行发送到 MinIO 存储桶。
接下来,我们将创建一个自定义 DAG,以在后处理后将对象从 API 发送到 MinIO 存储桶。
安装气流
使用 Pip 安装 Airflow。如果未安装,则必须安装pip,如果python不可用,则还需要对其进行符号链接。
安装 Amazon 提供程序以连接到 MinIO
以独立模式启动气流
配置日志
在 MinIO 中使用创建一个桶mc make bucket
打开/root/airflow/airflow.cfg并在下面添加以下设置[logging]
使用remote_base_log_folder您在上一步中在 MinIO 中创建的存储桶名称
应该remote_log_conn_id与我们将在下一步中创建的连接 ID 的名称相匹配。
在 Airflow UI 中,转到 Admin -> Connections

创建名称为 的新连接my_s3_conn。
输入minioadmin访问密钥和秘密密钥。
在 Extras 中,让我们使用以下语法将 URL 设置为本地 MinIO 部署
现在,要测试并确认它是否有效,请转至 DAGs -> example_sensor_decorator 并启用此 DAG。
在右侧使用“播放”按钮,触发 DAG。

几秒钟后,DAG 运行完毕后,运行以下命令查看日志。对于每个 DAG 运行,都会创建一个单独的日志文件夹。
使用 MinIO 存储桶存储 Airflow DAG 运行的日志只是我们正在探索的用例之一。在下一阶段,我们将创建一个自定义 DAG 来演示更多用例。
创建自定义 DAG
在此示例中,我们将创建一个自定义 DAG。这个 DAG 会做什么?
我们将连接到 Ghost Blog API
根据特定参数获取博客
将它们备份到 MinIO 中的一个桶中
让我们为 DAG 设置框架。
导入 DAG 框架所需的 python 库
为运行 DAG 的频率创建一个时间表
接下来让我们创建一个任务来从 Ghost API 中提取博客并将它们放入 MinIO 存储桶中。
让我们导入几个 python 包来连接到 Ghost API 和 MinIO bucket
使用该requests模块可以连接到 Ghost API 并获取一些博客
将博客放入 MinIO 存储桶
请注意,以上代码并非意味着“开箱即用”,而是为您提供想法并向您展示使用首选输入源创建自己的 DAG 的路径;目的地将永远是 MinIO。
使用 Airflow 和 MinIO 的云原生数据管道
当您拥有 MinIO 等云原生高性能存储与 Airflow 等云原生工具集成时,可能性是无限的。在此示例中,我们向您展示了一些基础知识,例如将 DAG 日志保存在 MinIO 存储桶中以及编写可以与任何 API 对话并对其执行操作的自定义 DAG,在本例中用于备份整个博客到 MinIO 桶。
但这仅仅是个开始,当你选择云原生时,你会接触到无数的集成框架。使用 Airflow,您可以创建任意数量的多云管道。例如,您可以将数以千兆字节的非结构化数据 ETL 转换为放置在 MinIO 中的结构化数据,然后其他进程可以读取和分析这些数据。您甚至可以将此管道用作图像调整器(类似于使用 Apache Kafka 和 MinIO 编排复杂工作流),方法是拍摄各种大小的图像,将它们调整为您的业务所需的大小,然后将它们放入另一个 MinIO 存储桶中可用于网络服务。