将 Apache Airflow 与 MinIO 结合使用

将 Apache Airflow 与 MinIO 结合使用

Apache Airflow是一个开源平台,用于以编程方式编写、安排和监控工作流。它最初由 Airbnb 的工程团队开发,但被提供给 Apache 软件基金会,并在 Apache 2.0 下获得许可。

Airflow 通常用于数据工程和数据科学管道,以自动执行任务,例如数据转换、加载和分析。它还用于其他行业,例如金融、医疗保健和电子商务,以实现业务流程的自动化。

Airflow 在连接方面非常灵活。这包括数据湖、数据仓库、数据库、API,当然还有对象存储。它在那些受益于数据管道即代码的用例中表现出色,例如:

  • 自动化驱动的数据流

  • 机器学习训练模型和再训练

  • 备份和快照

Airflow 是用 Python 编写的,并使用有向无环图 (DAG)来表示工作流。DAG 中的每个节点代表一个任务,节点之间的边代表任务之间的依赖关系。DAG 不关心任务本身,只关心顺序、重试次数等。复杂的 DAG 可能变得脆弱且难以排除故障,尤其是当架构师必须管理数十个任务时。

Airflow 提供了一个 Web 界面来管理和监控工作流,以及一个 API 来创建、更新和删除工作流。它还具有一组丰富的功能,包括对调度、警报、测试和版本控制的支持。

MinIO 是 Airflow 的完美伴侣,因为它具有行业领先的性能和可扩展性,使每个数据密集型工作负载都触手可及。通过将 PB 级数据存储在 MinIO 存储桶中,您可以在 Airflow 中创建数据管道来处理大量数据,这对于 DAG 尽快运行至关重要。处理完成后,您甚至可以将最终结果存储回 MinIO 存储桶中,供其他工具使用。MinIO 具有惊人的性能——最近的基准测试在 GET 上达到了 325 GiB/s (349 GB/s),在 PUT 上达到了 165 GiB/s (177 GB/s),只有 32 个现成的 NVMe SSD 节点。

除此之外,Apache Airflow 还可以将其日志存储在 MinIO 存储桶中。这在云或容器编排环境中很有用,在这些环境中,本地文件系统是短暂的,如果机器终止或容器停止,日志可能会丢失。

为了最大限度地降低这些环境中数据丢失的风险,建议使用更持久的云原生存储解决方案(如 MinIO)来存储 PB 级数据和日志。这确保即使机器或容器终止,数据也能持久保存。此外,在网络和安全允许的情况下,可以从任何位置访问它们。

将 MinIO 与 Apache Airflow 结合使用有几个原因:

  • MinIO 使 Airflow DAG 以行业领先的性能快速运行,使其处理速度比任何其他数据存储都快,而成本仅为其一小部分。通过在 MinIO 中拥有 DAG 所需的所有数据,您可以显着节省数据存储成本,同时获得最佳的性能成本比。

  • MinIO 依靠擦除编码为您的数据提供高可用性和持久性,并且还具有可配置的数据保护策略,以确保即使在发生服务器故障或其他灾难时您的数据也不会丢失。

  • MinIO 可以将数据存储在多个区域并在它们之间复制,允许您将数据存储在靠近用户的位置或符合您的数据法规的区域。这可以减少延迟并提高工作流程的性能。

  • MinIO 与 Apache Airflow 无缝集成,允许您使用 S3 API 来存储和检索您的数据和其他日志。这使得设置和使用 MinIO 与 Airflow 变得容易,无需任何额外配置。MinIO 的云原生集成意味着它可以与最广泛实施的软件和框架顺利合作。

Apache Airflow 和 MinIO 教程

在本教程中,我们将向您展示 Airflow 与 MinIO 的多个用例。

  • 首先,我们将向您展示如何将日志从 MinIO DAG 运行发送到 MinIO 存储桶。

  • 接下来,我们将创建一个自定义 DAG,以在后处理后将对象从 API 发送到 MinIO 存储桶。

安装气流

使用 Pip 安装 Airflow。如果未安装,则必须安装pip,如果python不可用,则还需要对其进行符号链接。

apt-get install python3-pip


ln -s /usr/bin/python /usr/bin/python3


AIRFLOW_VERSION=2.5.0
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)
pip install "apache-airflow==${AIRFLOW_VERSION}"

安装 Amazon 提供程序以连接到 MinIO

pip install apache-airflow-providers-amazon

以独立模式启动气流

airflow standalone

配置日志

在 MinIO 中使用创建一个桶mc make bucket

mc admin alias minio http://<IP>:9000 minioadmin minioadmin
mc mb minio/airflow-logs


打开/root/airflow/airflow.cfg并在下面添加以下设置[logging]

[logging]
remote_logging = True
remote_base_log_folder = s3://airflow-logs
remote_log_conn_id = my_s3_conn
encrypt_s3_logs = False

使用remote_base_log_folder您在上一步中在 MinIO 中创建的存储桶名称

应该remote_log_conn_id与我们将在下一步中创建的连接 ID 的名称相匹配。

在 Airflow UI 中,转到 Admin -> Connections


Untitled (10).png


创建名称为 的新连接my_s3_conn

输入minioadmin访问密钥和秘密密钥。

在 Extras 中,让我们使用以下语法将 URL 设置为本地 MinIO 部署

{ "endpoint_url" : "http://:9000"}

现在,要测试并确认它是否有效,请转至 DAGs -> example_sensor_decorator 并启用此 DAG。

在右侧使用“播放”按钮,触发 DAG。


Untitled (11).png


几秒钟后,DAG 运行完毕后,运行以下命令查看日志。对于每个 DAG 运行,都会创建一个单独的日志文件夹。

mc ls minio/airflow-logs

使用 MinIO 存储桶存储 Airflow DAG 运行的日志只是我们正在探索的用例之一。在下一阶段,我们将创建一个自定义 DAG 来演示更多用例。

创建自定义 DAG

在此示例中,我们将创建一个自定义 DAG。这个 DAG 会做什么?

  • 我们将连接到 Ghost Blog API

  • 根据特定参数获取博客

  • 将它们备份到 MinIO 中的一个桶中

让我们为 DAG 设置框架。

导入 DAG 框架所需的 python 库

from airflow.decorators import dag, task

为运行 DAG 的频率创建一个时间表

@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],

)


接下来让我们创建一个任务来从 Ghost API 中提取博客并将它们放入 MinIO 存储桶中。

让我们导入几个 python 包来连接到 Ghost API 和 MinIO bucket

import json
import requests

from minio import Minio
import urllib3

使用该requests模块可以连接到 Ghost API 并获取一些博客

api_token = "<token>"

page = 1
total_pages = 1

while page <= total_pages:
    api_url = ("https://minio.ghost.io/ghost/api/content/posts/?limit=1&page=%s&key=%s" % (page, api_token))
    response_str = requests.get(api_url)
    response_json = requests.get(api_url).json()

    print(response_json["meta"])
    print(response_json["posts"][0]["url"])

    total_pages = response_json["meta"]["pagination"]["pages"]

    page = page + 1

将博客放入 MinIO 存储桶

config = {
  "dest_bucket":    "processed", # This will be auto created
  "minio_endpoint": "http://<ip>:9000",
  "minio_username": "minioadmin",
  "minio_password": "minioadmin",
}

# Since we are using self-signed certs we need to disable TLS verification
http_client = urllib3.PoolManager(cert_reqs='CERT_NONE')
urllib3.disable_warnings()

# Initialize MinIO client
minio_client = Minio(config["minio_endpoint"],
              secure=True,
              access_key=config["minio_username"],
              secret_key=config["minio_password"],
              http_client = http_client
              )

# Create destination bucket if it does not exist
if not minio_client.bucket_exists(config["dest_bucket"]):
  minio_client.make_bucket(config["dest_bucket"])
  print("Destination Bucket '%s' has been created" % (config["dest_bucket"]))

      minio_client.fget_object(bucket_name, object_path, object_path)
      print("- Doing some pseudo image resizing or ML processing on %s" % object_path)
      minio_client.fput_object(config["dest_bucket"], object_path, object_path)
      print("- Uploaded processed object '%s' to Destination Bucket '%s'" % (object_path, config["dest_bucket"]))
      minio_client.fput_object(config["dest_bucket"], object_path, object_path)
      print("- Uploaded processed object '%s' to Destination Bucket '%s'" % (object_path, config["dest_bucket"]))

请注意,以上代码并非意味着“开箱即用”,而是为您提供想法并向您展示使用首选输入源创建自己的 DAG 的路径;目的地将永远是 MinIO。

使用 Airflow 和 MinIO 的云原生数据管道

当您拥有 MinIO 等云原生高性能存储与 Airflow 等云原生工具集成时,可能性是无限的。在此示例中,我们向您展示了一些基础知识,例如将 DAG 日志保存在 MinIO 存储桶中以及编写可以与任何 API 对话并对其执行操作的自定义 DAG,在本例中用于备份整个博客到 MinIO 桶。

但这仅仅是个开始,当你选择云原生时,你会接触到无数的集成框架。使用 Airflow,您可以创建任意数量的多云管道。例如,您可以将数以千兆字节的非结构化数据 ETL 转换为放置在 MinIO 中的结构化数据,然后其他进程可以读取和分析这些数据。您甚至可以将此管道用作图像调整器(类似于使用 Apache Kafka 和 MinIO 编排复杂工作流),方法是拍摄各种大小的图像,将它们调整为您的业务所需的大小,然后将它们放入另一个 MinIO 存储桶中可用于网络服务。


上一篇 下一篇