使用 Apache Kafka 和 MinIO 编排复杂的工作流

使用 Apache Kafka 和 MinIO 编排复杂的工作流

MinIO 是创建和执行复杂数据工作流的基础组件。这个事件驱动功能的核心是使用 Kafka 的 MinIO 桶通知。MinIO 为所有 HTTP 请求生成事件通知,例如PUTPOSTCOPYDELETEGET您可以使用这些通知来触发适当的应用程序、脚本和 Lambda 函数,以便在对象上传触发事件通知后采取操作。HEADCompleteMultipartUpload

事件通知为多个微服务交互和协作提供了松散耦合的范例。在此范例中,微服务不直接相互调用,而是使用事件通知进行通信。发送通知后,发送服务可以返回其任务,而接收服务将采取行动。这种隔离级别使维护代码变得更加容易——更改一个服务不需要更改其他服务,因为它们通过通知而不是直接调用进行通信。

有几个用例依赖 MinIO 事件通知来执行数据工作流。例如,我们可以使用将存储在 MinIO 中的对象的原始数据来运行 AI/ML 管道。

  • 处理数据的管道将在添加原始对象时触发

  • 基于添加的对象,模型将运行

  • 最终模型可以保存到 MinIO 中的一个桶中,然后其他应用程序可以将其作为最终产品使用。

构建工作流

我们将使用 MinIO 和 Kafka 为假设的图像缩放器应用程序构建一个示例工作流。它本质上是获取传入的图像并根据特定的应用程序规范调整它们的大小,然后将它们保存到另一个可以提供它们的存储桶中。这可能在现实世界中完成以调整图像大小并使它们可用于移动应用程序,或者只是调整图像大小以减轻动态调整图像大小时发生的资源压力。

它有几个组件,Kafka 和 MinIO 一起使用来支持这个复杂的工作流

  • MinIO,生产者:传入的原始对象存储在 MinIO 中。每次添加对象时,它都会向 Kafka 发送消息以代理特定主题。

  • Kafka,经纪人:经纪人维护队列的状态,存储传入的消息,并使其可供消费者使用。

  • MinIO,消费者:消费者将在队列中读取这条消息,因为他们实时进来,处理原始数据,并将其上传到 MinIO 桶。

MinIO 是所有这一切的基础,因为它是这个工作流的生产者和消费者。


Minio Kafka Workflow-100 (1).jpeg


使用 Kubernetes 集群

我们需要一个 Kubernetes 集群来运行我们的服务。您可以使用任何 Kubernetes 集群,但在本例中我们将使用kind集群。如果尚未安装 Kind,请按照此快速入门指南获取说明。使用以下类型的集群配置来构建一个简单的单主节点和一个多节点 Kubernetes 集群。

将此 yaml 另存为kind-config.yaml

kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
- role: worker
- role: worker
- role: worker

启动集群(这可能需要几分钟)

$ kind create cluster --name minio-kafka --config kind-config.yaml

… [truncated]


Set kubectl context to "kind-minio-kafka"
You can now use your cluster with:

kubectl cluster-info --context kind-minio-kafka


… [truncated]


验证集群是否启动

$ kubectl get no
NAME                        STATUS   ROLES           AGE   VERSION
minio-kafka-control-plane   Ready    control-plane   43s   v1.24.0
minio-kafka-worker          Ready    <none>          21s   v1.24.0
minio-kafka-worker2         Ready    <none>          21s   v1.24.0
minio-kafka-worker3         Ready    <none>          21s   v1.24.0      

安装卡夫卡

Kafka 需要运行一些服务来支持它才能运行。这些服务是:

  • 证书管理器

  • 动物园管理员

让我们在我们的 Kubernetes 集群中安装 cert-manager

$ kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.6.2/cert-manager.yaml

检查状态以验证已创建证书管理器资源

$ kubectl get ns
NAME                 STATUS   AGE
cert-manager         Active   6s
default              Active   20m
kube-node-lease      Active   20m
kube-public          Active   20m
kube-system          Active   20m
local-path-storage   Active   20m

$ kubectl get po -n cert-manager
NAME                                       READY   STATUS    RESTARTS   AGE
cert-manager-74f9fd7fb6-kqhsq              1/1     Running   0          14s
cert-manager-cainjector-67977b8fcc-k49gj   1/1     Running   0          14s
cert-manager-webhook-7ff8d87f4-wg94l       1/1     Running   0          14s

使用 Helm 图表安装 zookeeper。如果您没有安装 helm,您可以按照helm 文档中提供的安装指南进行操作。

$ helm repo add pravega https://charts.pravega.io
"pravega" has been added to your repositories

$ helm repo update

$ helm install zookeeper-operator --namespace=zookeeper --create-namespace pravega/zookeeper-operator

… [truncated]


$ kubectl --namespace zookeeper create -f - <<EOF
apiVersion: zookeeper.pravega.io/v1beta1
kind: ZookeeperCluster
metadata:
    name: zookeeper
    namespace: zookeeper
spec:
    replicas: 1
EOF




您应该看到类似于此的输出,这意味着集群创建正在进行中。

zookeepercluster.zookeeper.pravega.io/zookeeper created

验证 zookeeper operator 和集群 pod 是否都在运行

$ kubectl -n zookeeper get po
NAME                                  READY   STATUS    RESTARTS   AGE
zookeeper-0                           1/1     Running   0          31s
zookeeper-operator-5857967dcc-kfxxt   1/1     Running   0          3m4s      

现在我们已经具备了所有先决条件,让我们安装实际的 Kafka 集群组件。

Kafka 有一个名为Koperator 的操作员,我们将使用它来管理我们的 Kafka 安装。Kafka 集群启动大约需要 4-5 分钟。

$ kubectl create --validate=false -f https://github.com/banzaicloud/koperator/releases/download/v0.21.2/kafka-operator.crds.yaml

$ helm repo add banzaicloud-stable https://kubernetes-charts.banzaicloud.com/


$ helm repo update
… [truncated]

$ helm install kafka-operator --namespace=kafka --create-namespace banzaicloud-stable/kafka-operator

… [truncated]

$ kubectl create -n kafka -f https://raw.githubusercontent.com/banzaicloud/koperator/master/config/samples/simplekafkacluster.yaml


运行kubectl -n kafka get po确认Kafka已经启动。Kafka 需要几分钟才能运行。请等待,然后再继续。

配置 Kafka 主题

在 MinIO 中配置主题之前先配置主题;主题是先决条件。

创建一个名为my-topic

$ kubectl apply -n kafka -f - <<EOF
apiVersion: kafka.banzaicloud.io/v1alpha1
kind: KafkaTopic
metadata:
    name: my-topic
spec:
    clusterRef:
        name: kafka
    name: my-topic
    partitions: 1
    replicationFactor: 1
    config:
        "retention.ms": "604800000"
        "cleanup.policy": "delete"
EOF

它应该返回以下输出。如果不是,则主题创建不成功。如果不成功,请等待几分钟让Kafka集群上线,然后重新运行。

kafkatopic.kafka.banzaicloud.io/my-topic created

在接下来的几个步骤中,我们需要 Kafka pod 的 IP 和端口之一

要获取 IP:

$ kubectl -n kafka describe po kafka-0- | grep -i IP:
IP:           10.244.1.5
  IP:           10.244.1.5

注意:IP 对您来说会有所不同,可能与上面不匹配。

有几个端口我们感兴趣

$ kubectl -n kafka get po kafka-0- -o yaml | grep -iA1 containerport
    - containerPort: 29092
      name: tcp-internal
--
    - containerPort: 29093
      name: tcp-controller


… [truncated]


  • Tcp-internal 29092:这是当您作为消费者想要处理传入消息到 Kafka 集群时使用的端口。

  • Tcp-controller 29093:这是生产者(例如 MinIO)要向 Kafka 集群发送消息时使用的端口。

这些 IP 和端口可能会在您自己的设置中发生变化,因此请确保为您的集群获取正确的值。

安装 MinIO

我们将在与其他资源相同的 Kubernetes 集群中将 MinIO 安装在它自己的命名空间中。

获取 MinIO 回购

$ git clone https://github.com/minio/operator.git

应用资源来安装 MinIO

$ kubectl apply -k operator/resources


$ kubectl apply -k operator/examples/kustomization/tenant-lite


验证 MinIO 已启动并正在运行。在本例中,您可以获得 MinIO 控制台的端口9443

$ kubectl -n tenant-lite get svc | grep -i console

storage-lite-console             ClusterIP   10.96.0.215     <none>        9443/TCP   6m53s

设置 kubernetes 端口转发:我们39443在这里为主机选择了端口,但这可以是任何端口,只要确保在通过 Web 浏览器访问控制台时使用相同的端口即可。

$ kubectl -n tenant-lite port-forward svc/storage-lite-console 39443:9443


Forwarding from 127.0.0.1:39443 -> 9443

Forwarding from [::1]:39443 -> 9443


使用以下凭据通过 Web 浏览器访问

网址:https://localhost:39443

用户:minio

经过:minio123


Screen Shot 2022-08-10 at 4.47.06 PM.png


配置 MinIO 生产者

我们将配置 MinIO 以将事件发送到我们之前使用 mc 管理工具创建的 Kafka 集群中的 my-topic。

我在这里启动一个 Ubuntu pod,这样我就有了一个干净的工作空间,更重要的是我可以访问集群中的所有 pod,而无需访问port-forward每个单独的服务。

$ kubectl apply -f - <<EOF
apiVersion: v1
kind: Pod
metadata:
  name: ubuntu
  labels:
    app: ubuntu
spec:
  containers:
  - image: ubuntu
    command:
      - "sleep"
      - "604800"
    imagePullPolicy: IfNotPresent
    name: ubuntu
  restartPolicy: Always
EOF

进入 Ubuntu pod 以确保它已启动

$ kubectl exec -it ubuntu -- /bin/bash


root@ubuntu:/#


如果您看到任何带有前缀的命令,root@ubuntu:/则表示它是从这个 ubuntu pod 内部运行的。

mc使用以下命令获取二进制文件并安装

rroot@ubuntu:/# apt-get update
apt-get -y install wget
wget https://dl.min.io/client/mc/release/linux-amd64/mc
chmod +x mc
mv mc /usr/local/bin/

验证它是否安装正确

root@ubuntu:/# mc --version


mc version RELEASE.2022-08-05T08-01-28Z (commit-id=351d021b924b4d19f1eb716b9e2bd74644c402d8)

Runtime: go1.18.5 linux/amd64

Copyright (c) 2015-2022 MinIO, Inc.

License GNU AGPLv3 <https://www.gnu.org/licenses/agpl-3.0.html>


配置mc管理员使用我们的 MinIO 集群

  • mc alias set

在我们的例子中,这将转化为

root@ubuntu:/# mc alias set myminio https://minio.tenant-lite.svc.cluster.local minio minio123


Added `myminio` successfully.


通过运行以下命令验证配置是否按预期工作;你应该看到类似的东西8 drives online, 0 drives offline

root@ubuntu:/# mc admin info myminio


… [truncated]

Pools:
  1st, Erasure sets: 1, Disks per erasure set: 8

8 drives online, 0 drives offline




在 MinIO 中通过 设置 Kafka 配置mc admin您将需要自定义以下命令

root@ubuntu:/# mc admin config set myminio \
notify_kafka:1 \
brokers="10.244.1.5:29093" \
topic="my-topic" \
tls_skip_verify="off" \
queue_dir="" \
queue_limit="0" \
sasl="off" \
sasl_password="" \
sasl_username="" \
tls_client_auth="0" \
tls="off" \
client_tls_cert="" \
client_tls_key="" \
version="" --insecure

您必须特别注意其中的一些配置

  • brokers="10.244.1.5:29093":这些是具有格式的 Kafka 服务器server1:port1,server2:port2,serverN:portN注意:如果你决定给多台Kafka服务器,你需要给所有服务器的IP;如果您给出部分列表,它将失败。您可以提供单个服务器,但缺点是如果该服务器出现故障,那么配置将不知道集群中的其他 Kafka 服务器。正如我们之前提到的,有两个端口:TCP-internal 29092TCP-controller 29093由于我们将 MinIO 配置为生产者,因此我们将使用29093.

  • topic="my-topic":主题名称应与我们之前在 Kafka 集群中创建的主题匹配。提醒一下,MinIO 不会自动创建这个主题;它必须事先可用。

  • notify_kafka:1:这是稍后将用于实际添加事件的配置名称。

有关这些参数的更多详细信息,请访问我们的文档。

一旦成功,您应该看到下面的输出

Successfully applied new settings.

并根据需要让我们重新启动管理服务

root@ubuntu:/# mc admin service restart myminio


Restart command successfully sent to `myminio`. Type Ctrl-C to quit or wait to follow the status of the restart process.

....

Restarted `myminio` successfully in 2 seconds


在 MinIO 中创建一个名为images这是存储原始对象的地方。

root@ubuntu:/# mc mb myminio/images --insecure


Bucket created successfully `myminio/images`.


我们希望将发送到队列的消息限制为.jpg图像;这可以根据需要进行扩展,例如,如果您想将消息设置为基于另一个文件扩展名(例如.png.

root@ubuntu:/# mc event add  myminio/images arn:minio:sqs::1:kafka --suffix .jpg


Successfully added arn:minio:sqs::1:kafka


# Verify it has been added properly
root@ubuntu:/# mc event list myminio/images


arn:minio:sqs::1:kafka   s3:ObjectCreated:*,s3:ObjectRemoved:*,s3:ObjectAccessed:*   Filter: suffix=".jpg"


有关如何使用 MinIO 配置 Kafka 的更多详细信息,请访问我们的文档

构建 MinIO 消费者

如果我们真的有一个脚本可以使用 MinIO 产生的这些事件并对这些对象进行一些操作,那将是非常酷的。那么为什么不这样做呢?这样我们就可以全面了解工作流程。

在仍然登录到我们的 ubuntu pod 的同时,安装python3python3-pip运行我们的脚本。由于这是 Ubuntu 最小化,我们还需要vim编辑我们的脚本。

root@ubuntu:/# apt-get -y install python3 python3-pip vim

对于我们的 Python 消费者脚本,我们需要通过以下方式安装一些 Python 包pip

root@ubuntu:/# pip3 install minio kafka-python

Collecting minio

  Downloading minio-7.1.11-py3-none-any.whl (76 kB)

     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 76.1/76.1 KB 4.1 MB/s eta 0:00:00

Collecting kafka-python

  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)

     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 246.5/246.5 KB 8.6 MB/s eta 0:00:00

Collecting certifi

  Downloading certifi-2022.6.15-py3-none-any.whl (160 kB)

     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 160.2/160.2 KB 11.6 MB/s eta 0:00:00

Collecting urllib3

  Downloading urllib3-1.26.11-py2.py3-none-any.whl (139 kB)

     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 139.9/139.9 KB 18.4 MB/s eta 0:00:00

Installing collected packages: kafka-python, urllib3, certifi, minio

Successfully installed certifi-2022.6.15 kafka-python-2.0.2 minio-7.1.11 urllib3-1.26.11


如果您看到以上消息,那么我们已经成功安装了脚本所需的依赖项。

我们将在这里展示整个脚本,然后带您了解正在运行的不同组件。现在将此脚本保存为minio_consumer.py

from minio import Minio
import urllib3

from kafka import KafkaConsumer
import json

# Convenient dict for basic config
config = {
  "dest_bucket":    "processed", # This will be auto created
  "minio_endpoint": "minio.tenant-lite.svc.cluster.local",
  "minio_username": "minio",
  "minio_password": "minio123",
  "kafka_servers""10.244.1.5:29092",
  "kafka_topic":    "my-topic", # This needs to be created manually
}

# Since we are using self-signed certs we need to disable TLS verification
http_client = urllib3.PoolManager(cert_reqs='CERT_NONE')
urllib3.disable_warnings()

# Initialize MinIO client
minio_client = Minio(config["minio_endpoint"],
              secure=True,
              access_key=config["minio_username"],
              secret_key=config["minio_password"],
              http_client = http_client
              )

# Create destination bucket if it does not exist
if not minio_client.bucket_exists(config["dest_bucket"]):
  minio_client.make_bucket(config["dest_bucket"])
  print("Destination Bucket '%s' has been created" % (config["dest_bucket"]))

# Initialize Kafka consumer
consumer = KafkaConsumer(
  bootstrap_servers=config["kafka_servers"],
  value_deserializer = lambda v: json.loads(v.decode('ascii'))
)

consumer.subscribe(topics=config["kafka_topic"])

try:
  print("Ctrl+C to stop Consumer\n")
  for message in consumer:
    message_from_topic = message.value

    request_type = message_from_topic["EventName"]
    bucket_name, object_path = message_from_topic["Key"].split("/", 1)

    # Only process the request if a new object is created via PUT
    if request_type == "s3:ObjectCreated:Put":
      minio_client.fget_object(bucket_name, object_path, object_path)
     
      print("- Doing some pseudo image resizing or ML processing on %s" % object_path)


      minio_client.fput_object(config["dest_bucket"], object_path, object_path)

      print("- Uploaded processed object '%s' to Destination Bucket '%s'" % (object_path, config["dest_bucket"]))

except KeyboardInterrupt:

  print("\nConsumer stopped.")


  • 我们将导入我们之前安装的 pip 包

from minio import Minio
import urllib3

from kafka import KafkaConsumer
import json

  • 我们不是每次都在代码中修改参数,而是在这个配置字典中展示了一些常见的可配置参数。

config = {
  "dest_bucket":    "processed", # This will be auto created
  "minio_endpoint": "minio.tenant-lite.svc.cluster.local",
  "minio_username": "minio",
  "minio_password": "minio123",
  "kafka_servers""10.244.1.5:29092",
  "kafka_topic":    "my-topic", # This needs to be created manually

  • 我们启动的 MinIO 集群使用的是自签名证书。尝试连接时,我们需要确保它接受自签名证书。

http_client = urllib3.PoolManager(cert_reqs='CERT_NONE')
urllib3.disable_warnings()

  • 我们将检查存储已处理数据的目标存储桶是否存在;如果没有,那么我们将继续创建一个。

if not minio_client.bucket_exists(config["dest_bucket"]):
  minio_client.make_bucket(config["dest_bucket"])
  print("Destination Bucket '%s' has been created" % (config["dest_bucket"]))

  • 配置要连接的 Kafka 代理以及要订阅的主题

consumer = KafkaConsumer(
  bootstrap_servers=config["kafka_servers"],
  value_deserializer = lambda v: json.loads(v.decode('ascii'))
)

consumer.subscribe(topics=config["kafka_topic"])

  • 当您停止消费者时,它通常会吐出一个堆栈跟踪,因为消费者注定要永远运行并消费消息。这将使我们能够干净地退出消费者

try:
  print("Ctrl+C to stop Consumer\n")


… [truncated]


except KeyboardInterrupt:
  print("\nConsumer stopped.")




如前所述,我们将不断等待,收听有关该主题的新消息。一旦我们得到一个主题,我们将其分解为三个部分

  • request_type: HTTP请求的类型:GET, PUT, HEAD

  • bucket_name: 添加新对象的桶的名称

  • object_path: 添加到存储桶中的对象的完整路径

  for message in consumer:
    message_from_topic = message.value

    request_type = message_from_topic["EventName"]
    bucket_name, object_path = message_from_topic["Key"].split("/", 1)

  • 每次你提出任何请求时,MinIO 都会向主题添加一条消息,我们的脚本将读取该消息minio_consumer.py因此,为避免无限循环,我们只在添加新对象时进行处理,在本例中为请求类型 PUT。

if request_type == "s3:ObjectCreated:Put":
      minio_client.fget_object(bucket_name, object_path, object_path)

  • 您可以在此处添加客户代码以构建 ML 模型、调整图像大小以及处理 ETL/ELT 作业。

      print("- Doing some pseudo image resizing or ML processing on %s" % object_path)

  • 处理完对象后,它将上传到我们之前配置的目标存储桶中。如果存储桶不存在,我们的脚本将自动创建它。

minio_client.fput_object(config["dest_bucket"], object_path, object_path)
      print("- Uploaded processed object '%s' to Destination Bucket '%s'" % (object_path, config["dest_bucket"]))

你有它。除了一些样板代码之外,我们基本上在做两件事:

  • 收听有关 Kafka 主题的消息

  • 将对象放入 MinIO 桶中

该脚本并不完美——您需要添加一些额外的错误处理,但它非常简单。其余的您可以使用自己的代码库进行修改。有关详细信息,请访问我们的MinIO Python SDK 文档

使用 MinIO 事件

我们已经构建好了,现在让我们看看它的实际效果。创建两个终端:

  • 终端 1 (T1):正在运行的 Ubuntu podminio_consumer.py

  • 2 号航站楼 (T2):带有mc.

打开 T1 并运行minio_consumer.py我们之前使用python3如果在任何时候你想退出脚本,你可以输入Ctrl+C

root@ubuntu:/# python3 minio_consumer.py


Ctrl+C to stop Consumer


现在让我们打开 T2 并将一些对象放入我们之前使用创建的 MinIO 图像桶中mc

从创建一个测试对象开始

root@ubuntu:/# touch rose.jpg
root@ubuntu:/# echo "a" > rose.jpg

将测试对象上传到图像存储桶到几个不同的路径

root@ubuntu:/# mc cp rose.jpg myminio/images --insecure


/rose.jpg:            2 B / 2 B ┃▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓┃ 55 B/s 0s


root@ubuntu:/# mc cp rose.jpg myminio/images/deeper/path/rose.jpg --insecure


/rose.jpg:            2 B / 2 B ┃▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓┃ 63 B/s 0s


在我们运行 MinIO 消费者脚本的另一个终端 T1 中,您应该看到一些类似于下面的消息

root@ubuntu:/# python3 minio_consumer.py


… [truncated]


- Doing some pseudo image resizing or ML processing on rose.jpg

- Uploaded processed object 'rose.jpg' to Destination Bucket 'processed'

- Doing some pseudo image resizing or ML processing on deeper/path/rose.jpg

- Uploaded processed object 'deeper/path/rose.jpg' to Destination Bucket 'processed'


我们应该验证已处理的对象也已上传到已处理的存储桶

root@ubuntu:/# mc ls myminio/processed

[2022-08-12 01:03:46 UTC]     2B STANDARD rose.jpg


root@ubuntu:/# mc ls myminio/processed/deeper/path

[2022-08-12 01:09:04 UTC]     2B STANDARD rose.jpg


如您所见,我们已成功将对象从未处理的原始数据上传到已处理的存储桶。

使用通知在 MinIO 上构建工作流程

我们在这里展示的只是您可以通过此工作流程实现的示例。通过利用 Kafka 的持久消息传递和 MinIO 的弹性存储,您可以构建复杂的 AI 应用程序,这些应用程序由可以扩展并跟上工作负载的基础架构支持,例如:

  • 机器学习模型

  • 图片大小调整

  • 处理 ETL / ELT 作业



上一篇 下一篇