MinIO 是创建和执行复杂数据工作流的基础组件。 这个事件驱动功能的核心是使用 Kafka 的 MinIO 桶通知。 MinIO 为所有 HTTP 请求生成事件通知, 例如 PUT、 POST、 COPY、 DELETE、 GET和 。 您可以使用这些通知来触发适当的应用程序、脚本和 Lambda 函数,以便在对象上传触发事件通知后采取操作。 HEADCompleteMultipartUpload
事件通知为多个微服务交互和协作提供了松散耦合的范例。 在此范例中,微服务不直接相互调用,而是使用事件通知进行通信。 发送通知后,发送服务可以返回其任务,而接收服务将采取行动。 这种隔离级别使维护代码变得更加容易——更改一个服务不需要更改其他服务,因为它们通过通知而不是直接调用进行通信。
有几个用例依赖 MinIO 事件通知来执行数据工作流。 例如,我们可以使用将存储在 MinIO 中的对象的原始数据来运行 AI/ML 管道。
构建工作流 我们将使用 MinIO 和 Kafka 为假设的图像缩放器应用程序构建一个示例工作流。 它本质上是获取传入的图像并根据特定的应用程序规范调整它们的大小,然后将它们保存到另一个可以提供它们的存储桶中。 这可能在现实世界中完成以调整图像大小并使它们可用于移动应用程序,或者只是调整图像大小以减轻动态调整图像大小时发生的资源压力。
它有几个组件,Kafka 和 MinIO 一起使用来支持这个复杂的工作流
MinIO,生产者 :传入的原始对象存储在 MinIO 中。 每次添加对象时,它都会向 Kafka 发送消息以代理特定主题。
Kafka,经纪人 :经纪人维护队列的状态,存储传入的消息,并使其可供消费者使用。
MinIO,消费者 :消费者将在队列中读取这条消息,因为他们实时进来,处理原始数据,并将其上传到 MinIO 桶。
MinIO 是所有这一切的基础,因为它是这个工作流的生产者和消费者。
使用 Kubernetes 集群 我们需要一个 Kubernetes 集群来运行我们的服务。 您可以使用任何 Kubernetes 集群,但在本例中我们将使用 kind集群。 如果尚未安装 Kind,请按照此 快速入门指南 获取说明。 使用以下类型的集群配置来构建一个简单的单主节点和一个多节点 Kubernetes 集群。
将此 yaml 另存为 kind-config.yaml
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
- role: worker
- role: worker
- role: worker 启动集群(这可能需要几分钟)
$ kind create cluster --name minio-kafka --config kind-config.yaml
… [truncated]
Set kubectl context to "kind-minio-kafka" You can now use your cluster with: kubectl cluster-info --context kind-minio-kafka
… [truncated]
验证集群是否启动
$ kubectl get no NAME STATUS ROLES AGE VERSION minio-kafka-control-plane Ready control-plane 43 s v1.24.0 minio-kafka-worker Ready <none> 21 s v1.24.0 minio-kafka-worker2 Ready <none> 21 s v1.24.0 minio-kafka-worker3 Ready <none> 21 s v1.24.0
安装卡夫卡 Kafka 需要运行一些服务来支持它才能运行。 这些服务是:
让我们在我们的 Kubernetes 集群中安装 cert-manager
$ kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.6.2/cert-manager.yaml
检查状态以验证已创建证书管理器资源
$ kubectl get ns NAME STATUS AGE cert-manager Active 6 s default Active 20 m kube-node-lease Active 20 m kube-public Active 20 m kube-system Active 20 m local-path-storage Active 20 m $ kubectl get po -n cert-manager NAME READY STATUS RESTARTS AGE cert-manager-74f9fd7fb6-kqhsq 1 /1 Running 0 14 s cert-manager-cainjector-67977b8fcc-k49gj 1 /1 Running 0 14 s cert-manager-webhook-7ff8d87f4-wg94l 1 /1 Running 0 14 s
使用 Helm 图表安装 zookeeper。 如果您没有安装 helm,您可以按照 helm 文档 中提供的安装指南进行操作。
$ helm repo add pravega https://charts.pravega.io "pravega" has been added to your repositories $ helm repo update $ helm install zookeeper-operator --namespace=zookeeper --create-namespace pravega/zookeeper-operator … [truncated]
$ kubectl --namespace zookeeper create -f - <<EOF apiVersion: zookeeper.pravega.io/v1beta1 kind: ZookeeperCluster metadata: name: zookeeper namespace: zookeeper spec: replicas: 1 EOF
您应该看到类似于此的输出,这意味着集群创建正在进行中。
zookeepercluster.zookeeper.pravega.io/zookeeper created
验证 zookeeper operator 和集群 pod 是否都在运行
$ kubectl -n zookeeper get po NAME READY STATUS RESTARTS AGE zookeeper-0 1 /1 Running 0 31 s zookeeper-operator-5857967dcc-kfxxt 1 /1 Running 0 3 m4s
现在我们已经具备了所有先决条件,让我们安装实际的 Kafka 集群组件。
Kafka 有一个名为 Koperator 的 操作员,我们将使用它来管理我们的 Kafka 安装。 Kafka 集群启动大约需要 4-5 分钟。
$ kubectl create --validate=false -f https://github.com/banzaicloud/koperator/releases/download/v0.21.2/kafka-operator.crds.yaml
$ helm repo add banzaicloud-stable https://kubernetes-charts.banzaicloud.com/
$ helm repo update … [truncated] $ helm install kafka-operator --namespace=kafka --create-namespace banzaicloud-stable/kafka-operator … [truncated] $ kubectl create -n kafka -f https://raw.githubusercontent.com/banzaicloud/koperator/master/config/samples/simplekafkacluster.yaml
运行 kubectl -n kafka get po确认Kafka已经启动。 Kafka 需要几分钟才能运行。 请等待,然后再继续。
配置 Kafka 主题 在 MinIO 中配置主题之前先配置主题; 主题是先决条件。
创建一个名为 my-topic
$ kubectl apply -n kafka -f - <<EOF apiVersion: kafka.banzaicloud.io/v1alpha1 kind: KafkaTopic metadata: name: my-topic spec: clusterRef: name: kafka name: my-topic partitions: 1 replicationFactor: 1 config: "retention.ms": "604800000" "cleanup.policy": "delete" EOF
它应该返回以下输出。 如果不是,则主题创建不成功。 如果不成功,请等待几分钟让Kafka集群上线,然后重新运行。
kafkatopic.kafka.banzaicloud.io/my-topic created
在接下来的几个步骤中,我们需要 Kafka pod 的 IP 和端口之一
要获取 IP:
$ kubectl -n kafka describe po kafka-0- | grep -i IP: IP: 10.244.1.5 IP: 10.244.1.5
注意:IP 对您来说会有所不同,可能与上面不匹配。
有几个端口我们感兴趣
$ kubectl -n kafka get po kafka-0- -o yaml | grep -iA1 containerport - containerPort: 29092 name: tcp-internal -- - containerPort: 29093 name: tcp-controller
… [truncated]
这些 IP 和端口可能会在您自己的设置中发生变化,因此请确保为您的集群获取正确的值。
安装 MinIO 我们将在与其他资源相同的 Kubernetes 集群中将 MinIO 安装在它自己的命名空间中。
获取 MinIO 回购
$ git clone https://github.com/minio/operator.git
应用资源来安装 MinIO
$ kubectl apply -k operator/resources
$ kubectl apply -k operator/examples/kustomization/tenant-lite
验证 MinIO 已启动并正在运行。 在本例中,您可以获得 MinIO 控制台的端口 9443。
$ kubectl -n tenant-lite get svc | grep -i console storage-lite-console ClusterIP 10.96.0.215 <none> 9443/TCP 6m53s
设置 kubernetes 端口转发:我们 39443在这里为主机选择了端口,但这可以是任何端口,只要确保在通过 Web 浏览器访问控制台时使用相同的端口即可。
$ kubectl -n tenant-lite port-forward svc/storage-lite-console 39443:9443
Forwarding from 127.0.0.1:39443 -> 9443
Forwarding from [::1]:39443 -> 9443
使用以下凭据通过 Web 浏览器访问
网址: https://localhost:39443
用户: minio
经过: minio123
我们将配置 MinIO 以将事件发送到我们之前使用 mc 管理工具创建的 Kafka 集群中的 my-topic。
我在这里启动一个 Ubuntu pod,这样我就有了一个干净的工作空间,更重要的是我可以访问集群中的所有 pod,而无需访问 port-forward每个单独的服务。
$ kubectl apply -f - <<EOF apiVersion: v1 kind: Pod metadata: name: ubuntu labels: app: ubuntu spec: containers: - image: ubuntu command: - "sleep" - "604800" imagePullPolicy: IfNotPresent name: ubuntu restartPolicy: Always EOF
进入 Ubuntu pod 以确保它已启动
$ kubectl exec -it ubuntu -- /bin/bash
root@ubuntu:/#
如果您看到任何带有前缀的命令, root@ubuntu:/则表示它是从这个 ubuntu pod 内部运行的。
mc使用以下命令 获取二进制文件并安装
r root@ubuntu:/# apt-get update apt-get -y install wget wget https://dl.min.io/client/mc/release/linux-amd64/mc chmod +x mc mv mc /usr/local/bin/
验证它是否安装正确
root@ubuntu:/# mc --version
mc version RELEASE.2022-08-05T08-01-28Z (commit-id=351d021b924b4d19f1eb716b9e2bd74644c402d8)
Runtime: go1.18.5 linux/amd64
Copyright (c) 2015-2022 MinIO, Inc.
License GNU AGPLv3 <https://www.gnu.org/licenses/agpl-3.0.html>
配置 mc管理员使用我们的 MinIO 集群
在我们的例子中,这将转化为
root@ubuntu:/# mc alias set myminio https://minio.tenant-lite.svc.cluster.local minio minio123
Added `myminio` successfully.
通过运行以下命令验证配置是否按预期工作; 你应该看到类似的东西 8 drives online, 0 drives offline
root@ubuntu:/# mc admin info myminio
… [truncated] Pools: 1 st, Erasure sets: 1 , Disks per erasure set: 8 8 drives online, 0 drives offline
在 MinIO 中通过 设置 Kafka 配置 mc admin。 您将需要自定义以下命令
root@ubuntu:/# mc admin config set myminio \ notify_kafka: 1 \ brokers="10.244.1.5:29093" \ topic="my-topic" \ tls_skip_verify="off" \ queue_dir="" \ queue_limit="0" \ sasl="off" \ sasl_password="" \ sasl_username="" \ tls_client_auth="0" \ tls="off" \ client_tls_cert="" \ client_tls_key="" \ version="" --insecure
您必须特别注意其中的一些配置
brokers="10.244.1.5:29093":这些是具有格式的 Kafka 服务器 server1:port1,server2:port2,serverN:portN。 注意:如果你决定给多台Kafka服务器,你需要给所有服务器的IP; 如果您给出部分列表,它将失败。 您可以提供单个服务器,但缺点是如果该服务器出现故障,那么配置将不知道集群中的其他 Kafka 服务器。 正如我们之前提到的,有两个端口: TCP-internal 29092和 TCP-controller 29093。 由于我们将 MinIO 配置为生产者,因此我们将使用 29093.
topic="my-topic":主题名称应与我们之前在 Kafka 集群中创建的主题匹配。 提醒一下,MinIO 不会自动创建这个主题; 它必须事先可用。
notify_kafka:1:这是稍后将用于实际添加事件的配置名称。
有关这些参数的更多详细信息, 请访问我们的 文档。
一旦成功,您应该看到下面的输出
Successfully applied new settings.
并根据需要让我们重新启动管理服务
root@ubuntu:/# mc admin service restart myminio
Restart command successfully sent to `myminio`. Type Ctrl-C to quit or wait to follow the status of the restart process.
....
Restarted `myminio` successfully in 2 seconds
在 MinIO 中创建一个名为 images. 这是存储原始对象的地方。
root@ubuntu:/# mc mb myminio/images --insecure
Bucket created successfully `myminio/images`.
我们希望将发送到队列的消息限制为 .jpg图像; 这可以根据需要进行扩展,例如,如果您想将消息设置为基于另一个文件扩展名(例如 .png.
root@ubuntu:/# mc event add myminio/images arn:minio:sqs::1:kafka --suffix .jpg
Successfully added arn:minio:sqs::1:kafka
# Verify it has been added properly root@ubuntu:/# mc event list myminio/images
arn:minio:sqs::1:kafka s3:ObjectCreated:*,s3:ObjectRemoved:*,s3:ObjectAccessed:* Filter: suffix=".jpg"
有关如何使用 MinIO 配置 Kafka 的更多详细信息,请访问我们的 文档 。
构建 MinIO 消费者 如果我们真的有一个脚本可以使用 MinIO 产生的这些事件并对这些对象进行一些操作,那将是非常酷的。 那么为什么不这样做呢? 这样我们就可以全面了解工作流程。
在仍然登录到我们的 ubuntu pod 的同时,安装 python3并 python3-pip运行我们的脚本。 由于这是 Ubuntu 最小化,我们还需要 vim编辑我们的脚本。
root@ubuntu:/# apt-get -y install python3 python3-pip vim
对于我们的 Python 消费者脚本,我们需要通过以下方式安装一些 Python 包 pip
root@ubuntu:/# pip3 install minio kafka-python
Collecting minio
Downloading minio-7.1.11-py3-none-any.whl (76 kB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 76.1/76.1 KB 4.1 MB/s eta 0:00:00
Collecting kafka-python
Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 246.5/246.5 KB 8.6 MB/s eta 0:00:00
Collecting certifi
Downloading certifi-2022.6.15-py3-none-any.whl (160 kB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 160.2/160.2 KB 11.6 MB/s eta 0:00:00
Collecting urllib3
Downloading urllib3-1.26.11-py2.py3-none-any.whl (139 kB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 139.9/139.9 KB 18.4 MB/s eta 0:00:00
Installing collected packages: kafka-python, urllib3, certifi, minio
Successfully installed certifi-2022.6.15 kafka-python-2.0.2 minio-7.1.11 urllib3-1.26.11
如果您看到以上消息,那么我们已经成功安装了脚本所需的依赖项。
我们将在这里展示整个脚本,然后带您了解正在运行的不同组件。 现在将此脚本保存为 minio_consumer.py
from minio import Minio import urllib3 from kafka import KafkaConsumer import json # Convenient dict for basic config config = { "dest_bucket" : "processed" , # This will be auto created "minio_endpoint" : "minio.tenant-lite.svc.cluster.local" , "minio_username" : "minio" , "minio_password" : "minio123" , "kafka_servers" : "10.244.1.5:29092" , "kafka_topic" : "my-topic" , # This needs to be created manually } # Since we are using self-signed certs we need to disable TLS verification http_client = urllib3.PoolManager(cert_reqs= 'CERT_NONE' ) urllib3.disable_warnings() # Initialize MinIO client minio_client = Minio(config[ "minio_endpoint" ], secure= True , access_key=config[ "minio_username" ], secret_key=config[ "minio_password" ], http_client = http_client ) # Create destination bucket if it does not exist if not minio_client.bucket_exists(config[ "dest_bucket" ]): minio_client.make_bucket(config[ "dest_bucket" ]) print( "Destination Bucket '%s' has been created" % (config[ "dest_bucket" ])) # Initialize Kafka consumer consumer = KafkaConsumer( bootstrap_servers=config[ "kafka_servers" ], value_deserializer = lambda v: json.loads(v.decode( 'ascii' )) ) consumer.subscribe(topics=config[ "kafka_topic" ]) try : print( "Ctrl+C to stop Consumer\n" ) for message in consumer: message_from_topic = message.value request_type = message_from_topic[ "EventName" ] bucket_name, object_path = message_from_topic[ "Key" ].split( "/" , 1 ) # Only process the request if a new object is created via PUT if request_type == "s3:ObjectCreated:Put" : minio_client.fget_object(bucket_name, object_path, object_path) print( "- Doing some pseudo image resizing or ML processing on %s" % object_path)
minio_client.fput_object(config["dest_bucket"], object_path, object_path)
print("- Uploaded processed object '%s' to Destination Bucket '%s'" % (object_path, config["dest_bucket"]))
except KeyboardInterrupt:
print("\nConsumer stopped.")
from minio import Minio import urllib3 from kafka import KafkaConsumer import json
config = { "dest_bucket" : "processed" , # This will be auto created "minio_endpoint" : "minio.tenant-lite.svc.cluster.local" , "minio_username" : "minio" , "minio_password" : "minio123" , "kafka_servers" : "10.244.1.5:29092" , "kafka_topic" : "my-topic" , # This needs to be created manually
http_client = urllib3.PoolManager(cert_reqs= 'CERT_NONE' ) urllib3.disable_warnings()
if not minio_client.bucket_exists(config[ "dest_bucket" ]): minio_client.make_bucket(config[ "dest_bucket" ]) print( "Destination Bucket '%s' has been created" % (config[ "dest_bucket" ]))
consumer = KafkaConsumer( bootstrap_servers=config[ "kafka_servers" ], value_deserializer = lambda v: json.loads(v.decode( 'ascii' )) ) consumer.subscribe(topics=config[ "kafka_topic" ])
try : print( "Ctrl+C to stop Consumer\n" )
… [truncated]
except KeyboardInterrupt: print( "\nConsumer stopped." )
如前所述,我们将不断等待,收听有关该主题的新消息。 一旦我们得到一个主题,我们将其分解为三个部分
request_type: HTTP请求的类型:GET, PUT, HEAD
bucket_name: 添加新对象的桶的名称
object_path: 添加到存储桶中的对象的完整路径
for message in consumer: message_from_topic = message.value request_type = message_from_topic[ "EventName" ] bucket_name, object_path = message_from_topic[ "Key" ].split( "/" , 1 )
if request_type == "s3:ObjectCreated:Put" : minio_client.fget_object(bucket_name, object_path, object_path)
print( "- Doing some pseudo image resizing or ML processing on %s" % object_path)
minio_client.fput_object(config[ "dest_bucket" ], object_path, object_path) print( "- Uploaded processed object '%s' to Destination Bucket '%s'" % (object_path, config[ "dest_bucket" ]))
你有它。 除了一些样板代码之外,我们基本上在做两件事:
收听有关 Kafka 主题的消息
将对象放入 MinIO 桶中
该脚本并不完美——您需要添加一些额外的错误处理,但它非常简单。 其余的您可以使用自己的代码库进行修改。 有关详细信息,请访问我们的 MinIO Python SDK 文档 。
使用 MinIO 事件 我们已经构建好了,现在让我们看看它的实际效果。 创建两个终端:
打开 T1 并运行 minio_consumer.py我们之前使用 python3. 如果在任何时候你想退出脚本,你可以输入 Ctrl+C
root@ubuntu:/# python3 minio_consumer.py
Ctrl+C to stop Consumer
现在让我们打开 T2 并将一些对象放入我们之前使用创建的 MinIO 图像桶中 mc。
从创建一个测试对象开始
root@ubuntu:/# touch rose.jpg root@ubuntu:/# echo "a" > rose.jpg
将测试对象上传到图像存储桶到几个不同的路径
root@ubuntu:/# mc cp rose.jpg myminio/images --insecure
/rose.jpg: 2 B / 2 B ┃▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓┃ 55 B/s 0s
root@ubuntu:/# mc cp rose.jpg myminio/images/deeper/path/rose.jpg --insecure
/rose.jpg: 2 B / 2 B ┃▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓┃ 63 B/s 0s
在我们运行 MinIO 消费者脚本的另一个终端 T1 中,您应该看到一些类似于下面的消息
root@ubuntu:/# python3 minio_consumer.py
… [truncated]
- Doing some pseudo image resizing or ML processing on rose.jpg
- Uploaded processed object 'rose.jpg' to Destination Bucket 'processed'
- Doing some pseudo image resizing or ML processing on deeper/path/rose.jpg
- Uploaded processed object 'deeper/path/rose.jpg' to Destination Bucket 'processed'
我们应该验证已处理的对象也已上传到已处理的存储桶
root@ubuntu:/# mc ls myminio/processed
[2022-08-12 01 :03:46 UTC] 2 B STANDARD rose.jpg
root@ubuntu:/# mc ls myminio/processed/deeper/path
[2022-08-12 01:09:04 UTC] 2B STANDARD rose.jpg
如您所见,我们已成功将对象从未处理的原始数据上传到已处理的存储桶。
使用通知在 MinIO 上构建工作流程 我们在这里展示的只是您可以通过此工作流程实现的示例。 通过利用 Kafka 的持久消息传递和 MinIO 的弹性存储,您可以构建复杂的 AI 应用程序,这些应用程序由可以扩展并跟上工作负载的 基础架构支持,例如:
机器学习模型
图片大小调整
处理 ETL / ELT 作业