Apache Kafka 是一个开源分布式事件流平台,用于构建实时数据管道和流式应用程序。 它最初由 LinkedIn 开发,现在由 Apache 软件基金会维护。 Kafka 旨在处理高容量、高吞吐量和低延迟数据流,使其成为构建可扩展且可靠的数据流解决方案的热门选择。
Kafka 的一些好处包括:
规模和速度: 每秒处理大规模数据流和数百万个事件,并通过向集群添加更多 Kafka 代理来横向扩展
容错: 在 Kafka 集群中跨多个 Broker 复制数据,确保数据高可用,并在发生故障时可以恢复,使 Kafka 成为关键数据流应用程序的可靠选择
多功能性: 支持各种数据源和数据接收器,使其具有高度的通用性。 它可用于构建广泛的应用程序,例如实时数据处理、数据摄取、数据流和事件驱动架构
持久性: 所有发布的消息都存储一段可配置的时间,允许消费者按照自己的节奏阅读数据。 这使得 Kafka 适用于需要保留数据以进行历史分析或重放以进行恢复的情况。
请参阅 Apache Kafka 了解更多信息。
在广泛使用的容器编排平台 Kubernetes 上部署 Kafka 提供了几个额外的优势。 Kubernetes 支持根据需求动态扩展 Kafka 集群,从而实现高效的资源利用和 Kafka 代理的自动扩展以处理不断变化的数据流量。 这确保了 Kafka 可以处理不同的工作负载,而不会造成不必要的资源浪费或性能下降。
它提供了简单的部署、管理和监控 将 Kafka 集群作为容器运行可以提供简单的部署、管理和监控,并使其在不同环境中具有高度可移植性。 这允许跨各种云提供商、数据中心或开发环境无缝迁移 Kafka 集群。
Kubernetes 包含用于处理故障和确保 Kafka 集群高可用性的内置功能。 例如,它自动重新调度失败的 Kafka broker 容器并支持滚动更新而不停机,确保 Kafka 对数据流应用程序的持续可用性,从而增强 Kafka 部署的可靠性和容错性。
Kafka 和 MinIO 通常用于构建数据流解决方案。 MinIO 是一个高性能的分布式对象存储系统,旨在支持具有 S3 兼容存储的非结构化、半结构化和结构化数据的云原生应用程序。 当用作 Kafka 的数据接收器时,MinIO 使组织能够实时存储和处理大量数据。
结合 Kafka 和 MinIO 的一些好处包括:
高性能: MinIO 写入 Kafka 流的速度与其传入速度一样快。 最近的基准测试 在 GET 上达到了 325 GiB/s(349 GB/s),在 PUT 上达到了 165 GiB/s(177 GB/s),只有 32 个节点关闭 -现成的 NVMe SSD。
可扩展性: MinIO 可处理大量数据并可在多个节点之间横向扩展,非常适合存储 Kafka 生成的数据流。 这使组织能够实时存储和处理大量数据,使其适用于大数据和高速数据流用例。
持久性 :MinIO 提供持久存储,允许组织长期保留数据,例如用于历史分析、合规性要求或数据恢复目的。
容错 :MinIO跨多个节点 擦除编码 数据,提供容错并确保数据持久性。 这补充了 Kafka 的容错能力,使整体解决方案具有高可用性、可靠性和弹性。
易于集成 :MinIO 可以使用 Kafka Connect 轻松与 Kafka 集成,Kafka Connect 是一个用于连接 Kafka 与外部系统的内置框架。 这使得将数据从 Kafka 流式传输到 MinIO 进行存储变得简单,反之亦然以进行数据检索,从而实现 Kafka 和 MinIO 之间的无缝数据流。 我们将在下面的教程中看到这是多么简单。
在这篇文章中,我们将介绍如何使用 Strimzi 在 Kubernetes 上设置 Kafka ,Strimzi 是一个开源项目,它提供操作员在 Kubernetes 上运行 Apache Kafka 和 Apache ZooKeeper 集群,包括 OpenShift 等发行版。 然后我们将使用 Kafka Connect 将数据流式传输到 MinIO。
先决条件 在我们开始之前,请确保您拥有以下内容:
安装 Strimzi 运算符 第一步是在 Kubernetes 集群上安装 Strimzi operator。 Strimzi 操作员管理 Kubernetes 上的 Kafka 和 ZooKeeper 集群的生命周期。
添加 Strimzi Helm 图表存储库
!helm repo add strimzi https: //strimzi.io/charts/ "strimzi" already exists with the same configuration, skipping
安装版本名称为 my-release 的图表:
!helm install my-release strimzi/strimzi-kafka-operator --namespace=kafka --create-namespace NAME: my-release LAST DEPLOYED: Mon Apr 10 20 : 03 : 12 2023 NAMESPACE: kafka STATUS: deployed REVISION: 1 TEST SUITE: None NOTES: Thank you for installing strimzi-kafka-operator -0.34.0 To create a Kafka cluster refer to the following documentation. https: //strimzi.io/docs/operators/latest/deploying.html#deploying-cluster-operator-helm-chart-str
这将在新创建的 kafka 命名空间中安装最新版本的操作员(在撰写本文时为 0.34.0)。 有关其他配置,请参阅 此 页面。
创建 Kafka 集群 现在我们已经安装了 Strimzi operator,我们可以创建一个 Kafka 集群。 在此示例中,我们将创建一个包含三个 Kafka 代理和三个 ZooKeeper 节点的 Kafka 集群。
让我们创建一个 YAML 文件,如下 所示
%%writefile deployment/kafka-cluster.yaml apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-kafka-cluster namespace: kafka spec: kafka: version: 3.4.0 replicas: 3 listeners: - name: plain port: 9092 type : internal tls: false - name: tls port: 9093 type : internal tls: true config: offsets.topic.replication.factor: 3 transaction.state.log.replication.factor: 3 transaction.state.log.min.isr: 2 default .replication.factor: 3 min.insync.replicas: 2 inter.broker.protocol.version: "3.4" storage: type : jbod volumes: - id: 0 type : persistent-claim size: 100 Gi deleteClaim: false zookeeper: replicas: 3 storage: type : persistent-claim size: 100 Gi deleteClaim: false entityOperator: topicOperator: {} userOperator: {} Overwriting deployment/kafka-cluster.yaml
让我们通过部署 YAML 文件来创建集群。 我们正在部署集群,因此需要一些时间才能启动并运行
!kubectl apply -f deployment/kafka-cluster.yaml kafka.kafka.strimzi.io/my-kafka-cluster created
检查集群的状态
!kubectl -n kafka get kafka my-kafka-cluster
NAME DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS READY WARNINGS
my-kafka-cluster 3 现在我们已经启动并运行了集群,让我们生成和使用示例主题事件,从 kafka 主题 my-topic 开始。
创建 Kafka 主题 如下所示为 kafka 主题 my-topic 创建一个 YAML 文件并应用它。
%%writefile deployment/kafka-my-topic.yaml apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: my-topic namespace: kafka labels: strimzi.io/cluster: my-kafka-cluster spec: partitions: 3 replicas: 3 Overwriting deployment/kafka-my-topic.yaml !kubectl apply -f deployment/kafka-my-topic.yaml kafkatopic.kafka.strimzi.io/connect-offsets created
检查主题的状态
!kubectl -n kafka get kafkatopic my-topic NAME CLUSTER PARTITIONS REPLICATION FACTOR READY my-topic my-kafka-cluster 3 3 True
生产和消费消息 通过设置 Kafka 集群和主题,我们现在可以生成和使用消息。
要创建 Kafka 生产者 pod 以向 my-topic 主题生成消息,请在终端中尝试以下命令
kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka: 0.34.0 -kafka -3.4.0 --rm= true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-kafka-cluster-kafka-bootstrap: 9092 --topic my-topic
这将提示我们向生产者发送消息。 同时,我们可以让消费者开始消费我们发送给生产者的消息
kubectl -n kafka run kafka-consumer -ti --image=quay.io/strimzi/kafka: 0.34.0 -kafka -3.4.0 --rm= true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-kafka-cluster-kafka-bootstrap: 9092 --topic my-topic --from-beginning
消费者将重播我们之前发送给生产者的所有消息,如果我们向生产者添加任何新消息,它们也将开始出现在消费者端。
您可以删除 my-topic 主题
!kubectl -n kafka delete kafkatopic my-topic kafkatopic.kafka.strimzi.io "my-topic" deleted
现在 Kafka 集群已经启动并运行了一个虚拟主题生产者/消费者,我们可以开始使用 Kafka 连接器将主题直接消费到 MinIO 中。
使用 MinIO 设置 Kafka 连接器 接下来我们将使用 Kafka 连接器将主题直接流式传输到 MinIO。 首先让我们看看什么是连接器以及如何设置连接器。 这是不同 Kafka 组件如何交互的高级概述
卡夫卡连接器 Kafka Connect 是一个集成工具包,用于在 Kafka 代理和其他系统之间流式传输数据。 另一个系统通常是外部数据源或目标,例如 MinIO。
Kafka Connect 利用插件架构为连接器提供实现工件,这些连接器用于连接到外部系统和操作数据。 插件由连接器、数据转换器和转换组成。 连接器旨在与特定的外部系统一起工作,并为其配置定义架构。 配置 Kafka Connect 时,您配置连接器实例,然后连接器实例定义一组系统间数据移动任务。
在分布式运行模式中,Strimzi 通过将数据流任务分布到一个或多个 worker pod 来运行 Kafka Connect。 Kafka Connect 集群由一组 worker pod 组成,每个连接器都在一个 worker 上实例化。 每个连接器都可以有一个或多个任务,这些任务分布在一组工作人员中,从而实现高度可扩展的数据管道。
Kafka Connect 中的工作人员负责将数据从一种格式转换为另一种格式,使其适用于源系统或目标系统。 根据连接器实例的配置,工作人员还可以应用转换,也称为 单消息转换 (SMT),它可以在消息转换之前调整消息,例如通过过滤某些数据。 Kafka Connect 带有一些内置的转换,但插件可以根据需要提供额外的转换。
Kafka Connect 在流式传输数据时使用以下组件
连接器 - 创建任务
任务 - 移动数据
Workers - 运行任务
变形金刚 - 操纵数据
转换器 - 转换数据
有两种类型的连接器
源连接器 - 将数据推送到 Kafka
Sink Connectors——将数据从 Kafka 提取到外部源,如 MinIO
让我们配置一个 Sink Connector,它从 Kafka 中提取数据并将其存储到 MinIO 中,如下所示
Sink 连接器从 Kafka 流式传输数据并执行以下步骤
插件为 Sink Connector 提供了实现工件:在 Kafka Connect 中,Sink Connector 用于将数据从 Kafka 流式传输到外部系统。 接收器连接器的实现工件(例如代码和配置)由插件提供。 插件用于扩展 Kafka Connect 的功能并启用与不同外部数据系统的连接。
单个 worker 启动 Sink Connector 实例:在分布式操作模式下,Kafka Connect 作为 worker pod 集群运行。 每个 worker pod 都可以发起一个 Sink Connector 实例,该实例负责将数据从 Kafka 流式传输到外部数据系统。 worker 管理 Sink Connector 实例的生命周期,包括它的初始化和配置。
Sink Connector 创建任务以流式传输数据:一旦 Sink Connector 实例启动,它就会创建一个或多个任务以将数据从 Kafka 流式传输到外部数据系统。 每个任务负责处理一部分数据,并且可以与其他任务并行运行以进行高效的数据处理。
任务并行运行以轮询 Kafka 并返回记录:任务从 Kafka 主题检索记录并准备将它们转发到外部数据系统。 任务的并行处理可实现高吞吐量和高效的数据流。
转换器将记录转换为适合外部数据系统的格式:在将记录转发到外部数据系统之前,转换器用于将记录转换为适合外部数据系统特定要求的格式。 转换器处理数据格式转换,例如从 Kafka 的二进制格式到外部数据系统支持的格式。
转换调整记录,例如过滤或重新标记它们:根据接收器连接器的配置,可以应用转换、单消息转换 (SMT) 来调整记录,然后再将它们转发到外部数据系统。 转换可用于过滤、重新标记或丰富要发送到外部系统的数据等任务。
接收器连接器使用 KafkaConnectors 或 Kafka Connect API 进行管理:接收器连接器及其任务使用 KafkaConnectors 或通过 Kafka Connect API 进行管理,后者提供用于管理 Kafka Connect 的编程访问。 这允许在 Kafka Connect 部署中轻松配置、监视和管理接收器连接器及其任务。
设置 我们将创建一个简单的示例,它将执行以下步骤
创建一个将从 MinIO 流式传输数据并以 JSON 格式为主题生成事件的生产者
构建具有 S3 依赖项的 Kafka 连接映像
根据上图部署Kafka Connect
部署使用 kafka 主题并存储数据 MinIO 桶的 Kafka 接收器连接器
将演示数据导入 MinIO 我们将使用 MinIO 上可用的 NYC Taxi 数据集。 如果您没有数据集,请按照 此处的说明进行操作
制作人 下面是一个简单的 Python 代码,它使用来自 MinIO 的数据并为主题 my-topic 生成事件
%%writefile sample-code/producer/src/producer.py import logging import os import fsspec import pandas as pd import s3fs from kafka import KafkaProducer logging.basicConfig(level=logging.INFO) producer = KafkaProducer(bootstrap_servers= "my-kafka-cluster-kafka-bootstrap:9092" ) fsspec.config.conf = { "s3" : { "key" : os.getenv( "AWS_ACCESS_KEY_ID" , "openlakeuser" ), "secret" : os.getenv( "AWS_SECRET_ACCESS_KEY" , "openlakeuser" ), "client_kwargs" : { "endpoint_url" : "https://play.min.io:50000" } } } s3 = s3fs.S3FileSystem() total_processed = 0 i = 1 for df in pd.read_csv( 's3a://openlake/spark/sample-data/taxi-data.csv' , chunksize= 1000 ): count = 0 for index, row in df.iterrows(): producer.send( "my-topic" , bytes(row.to_json(), 'utf-8' )) count += 1 producer.flush() total_processed += count if total_processed % 10000 * i == 0 : logging.info(f "total processed till now {total_processed}" ) i += 1
Overwriting sample-code/src/producer.py
添加需求和 Dockerfile,我们将基于这些需求和 Dockerfile 构建 docker 镜像
%%writefile sample-code/producer/requirements.txt pandas== 2.0.0 s3fs== 2023.4.0 pyarrow== 11.0.0 kafka-python== 2.0.2
Overwriting sample-code/producer/requirements.txt
In [ 14 ]: %%writefile sample-code/producer/Dockerfile FROM python: 3.11 -slim ENV PYTHONDONTWRITEBYTECODE= 1 COPY requirements.txt . RUN pip3 install -r requirements.txt COPY src/producer.py . CMD [ "python3" , "-u" , "./producer.py" ]
Overwriting sample-code/Dockerfile
使用上述 Docker 文件为生产者构建和推送 Docker 镜像,或者您可以使用 openlake openlake/kafka-demo-producer 中可用的镜像
让我们创建一个 YAML 文件,将我们的生产者作为作业部署在 Kubernetes 集群中
%%writefile deployment/producer.yaml apiVersion: batch/v1 kind: Job metadata: name: producer-job namespace: kafka spec: template: metadata: name: producer-job spec: containers: - name: producer-job image: openlake/kafka-demo-producer:latest restartPolicy: Never
Writing deployment/producer.yaml
部署 producer.yaml 文件
In [ 84 ]: !kubectl apply -f deployment/producer.yaml job.batch/producer-job created
使用以下命令检查日志
In [ 24 ]: !kubectl logs -f job.batch/producer-job -n kafka # stop this shell once you are done <jemalloc>: MADV_DONTNEED does not work (memset will be used instead) <jemalloc>: (This is the expected behaviour if you are running under QEMU) INFO:kafka.conn:<BrokerConnection node_id=bootstrap -0 host=my-kafka-cluster-kafka-bootstrap: 9092 <connecting> [IPv4 ( '10.96.4.95' , 9092 )]>: connecting to my-kafka-cluster-kafka-bootstrap: 9092 [( '10.96.4.95' , 9092 ) IPv4] INFO:kafka.conn:Probing node bootstrap -0 broker version INFO:kafka.conn:<BrokerConnection node_id=bootstrap -0 host=my-kafka-cluster-kafka-bootstrap: 9092 <connecting> [IPv4 ( '10.96.4.95' , 9092 )]>: Connection complete. INFO:kafka.conn:Broker version identified as 2.5.0 INFO:kafka.conn:Set configuration api_version=( 2 , 5 , 0 ) to skip auto check_version requests on startup INFO:kafka.conn:<BrokerConnection node_id= 0 host=my-kafka-cluster-kafka -0. my-kafka-cluster-kafka-brokers.kafka.svc: 9092 <connecting> [IPv4 ( '10.244.1.4' , 9092 )]>: connecting to my-kafka-cluster-kafka -0. my-kafka-cluster-kafka-brokers.kafka.svc: 9092 [( '10.244.1.4' , 9092 ) IPv4] INFO:kafka.conn:<BrokerConnection node_id= 0 host=my-kafka-cluster-kafka -0. my-kafka-cluster-kafka-brokers.kafka.svc: 9092 <connecting> [IPv4 ( '10.244.1.4' , 9092 )]>: Connection complete. INFO:kafka.conn:<BrokerConnection node_id=bootstrap -0 host=my-kafka-cluster-kafka-bootstrap: 9092 <connected> [IPv4 ( '10.96.4.95' , 9092 )]>: Closing connection. INFO:root:total processed till now 10000 rpc error: code = NotFound desc = an error occurred when try to find container "85acfb121b7b63bf0f46d9ef89aed9b05666b3fb86b4a835e9d2ebf67c6943f9" : not found
现在我们的基本生产者已将 JSON 事件发送到我的主题,让我们部署 Kafka Connect 和相应的连接器,将这些事件存储在 MinIO 中。
构建 Kafka 连接图像 让我们构建一个具有 S3 依赖项的 Kafka Connect 镜像
%%writefile sample-code/connect/Dockerfile FROM confluentinc/cp-kafka-connect: 7.0.9 as cp RUN confluent-hub install --no-prompt confluentinc/kafka-connect-s3: 10.4.2 RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter: 7.3.3 FROM quay.io/strimzi/kafka: 0.34.0 -kafka -3.4.0 USER root:root # Add S3 dependency COPY --from=cp /usr/share/confluent-hub-components/confluentinc-kafka-connect-s3/ /opt/kafka/plugins/kafka-connect-s3/
Overwriting sample-code/connect/Dockerfile
使用上述 Dockerfile 为生产者构建和推送 Docker 镜像,或者可以使用 openlake openlake/kafka-connect:0.34.0 中可用的镜像
在部署 Kafka Connect 之前,我们需要创建存储主题(如果尚未存在)以使 Kafka Connect 按预期工作。
创建存储主题 让我们创建连接状态、连接配置和连接偏移主题并按如下所示部署它们
%%writefile deployment/connect-status-topic.yaml apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: connect-status namespace: kafka labels: strimzi.io/cluster: my-kafka-cluster spec: partitions: 1 replicas: 3 config: cleanup.policy: compact
Writing deployment/connect-status-topic.yaml
In [ 73 ]: %%writefile deployment/connect-configs-topic.yaml apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: connect-configs namespace: kafka labels: strimzi.io/cluster: my-kafka-cluster spec: partitions: 1 replicas: 3 config: cleanup.policy: compact
Writing deployment/connect-configs-topic.yaml
In [ 74 ]: %%writefile deployment/connect-offsets-topic.yaml apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: connect-offsets namespace: kafka labels: strimzi.io/cluster: my-kafka-cluster spec: partitions: 1 replicas: 3 config: cleanup.policy: compact
Writing deployment/connect-offsets-topic.yaml
部署以上主题
In [ ]: !kubectl apply -f deployment/connect-status-topic.yaml !kubectl apply -f deployment/connect-configs-topic.yaml !kubectl apply -f deployment/connect-offsets-topic.yaml 在[ ] 中 :
部署卡夫卡连接 接下来,为使用上述图像的 Kafka Connect 创建一个 YAML 文件并将其部署在 Kubernetes 中。 Kafka Connect 将有 1 个副本并使用我们在上面创建的存储主题。
注意:spec.template.connectContainer.env 定义了凭据,以便 Kafka Connect 将数据存储在 Minio 集群中。 其他详细信息,如 endpoint_url、bucket_name 将成为 KafkaConnector 的一部分
In [ 75 ]: %%writefile deployment/connect.yaml apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: connect-cluster namespace: kafka annotations: strimzi.io/use-connector-resources: "true" spec: image: openlake/kafka-connect: 0.34.0 version: 3.4.0 replicas: 1 bootstrapServers: my-kafka-cluster-kafka-bootstrap: 9093 tls: trustedCertificates: - secretName: my-kafka-cluster-cluster-ca-cert certificate: ca.crt config: bootstrap.servers: my-kafka-cluster-kafka-bootstrap: 9092 group.id: connect-cluster key.converter: org.apache.kafka.connect.json.JsonConverter value.converter: org.apache.kafka.connect.json.JsonConverter internal.key.converter: org.apache.kafka.connect.json.JsonConverter internal.value.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: false value.converter.schemas.enable: false offset.storage.topic: connect-offsets offset.storage.replication.factor: 1 config.storage.topic: connect-configs config.storage.replication.factor: 1 status.storage.topic: connect-status status.storage.replication.factor: 1 offset.flush.interval.ms: 10000 plugin.path: /opt/kafka/plugins offset.storage.file.filename: /tmp/connect.offsets template: connectContainer: env: - name: AWS_ACCESS_KEY_ID value: "openlakeuser" - name: AWS_SECRET_ACCESS_KEY value: "openlakeuser"
Writing deployment/connect.yaml
In [ 87 ]: !kubectl apply -f deployment/connect.yaml kafkaconnect.kafka.strimzi.io/connect-cluster created
部署 Kafka 接收器连接器 现在我们已经启动并运行了 Kafka Connect,下一步是部署 Sink Connector,它将轮询 my-topic 并将数据存储到 MinIO bucket openlake-tmp 中。
connector.class - 指定 Sink 连接器将使用的连接器类型,在我们的例子中是 io.confluent.connect.s3.S3SinkConnector
store.url - 您要存储来自 Kafka Connect 的数据的 MinIO 端点 URL
storage.class - 指定要使用的存储类,在我们的例子中,我们存储在 MinIO 中,因此将使用 io.confluent.connect.s3.storage.S3Storage
format.class - 在 MinIO 中存储数据的格式类型,因为我们想存储 JSON,我们将使用 io.confluent.connect.s3.format.json.JsonFormat
In [ 90 ]: %%writefile deployment/connector.yaml apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: "minio-connector" namespace: "kafka" labels: strimzi.io/cluster: connect-cluster spec: class: io.confluent.connect.s3.S3SinkConnector config: connector.class: io.confluent.connect.s3.S3SinkConnector task.max: '1' topics: my-topic s3.region: us-east -1 s3.bucket.name: openlake-tmp s3.part.size: '5242880' flush.size: '1000' store.url: https: //play.min.io:50000 storage.class: io.confluent.connect.s3.storage.S3Storage format.class: io.confluent.connect.s3.format.json.JsonFormat partitioner.class: io.confluent.connect.storage.partitioner.DefaultPartitioner behavior.on.null.values: ignore
Overwriting deployment/connector.yaml
In [ 89 ]: !kubectl apply -f deployment/connector.yaml kafkaconnector.kafka.strimzi.io/minio-connector created
我们可以看到文件被添加到 Minio openlake-tmp 存储桶中
In [ 79 ]: !mc ls --summarize --recursive play/openlake-tmp/topics/my-topic ] 11 ;?\[ 2023-04-11 19 : 53 : 29 PDT] 368 KiB STANDARD partition= 0 /my-topic+ 0 + 0000000000. json [ 2023-04-11 19 : 53 : 30 PDT] 368 KiB STANDARD partition= 0 /my-topic+ 0 + 0000001000. json
[...TRUNCATED…]
[ 2023-04-11 19 : 54 : 07 PDT] 368 KiB STANDARD partition= 0 /my-topic+ 0 + 0000112000. json [ 2023-04-11 19 : 54 : 08 PDT] 368 KiB STANDARD partition= 0 /my-topic+ 0 + 0000113000. json [ 2023-04-11 19 : 54 : 08 PDT] 368 KiB STANDARD partition= 0 /my-topic+ 0 + 0000114000. json Total Size: 41 MiB Total Objects: 115
我们创建了一个在 Kafka 中生成主题并使用 Kafka 连接器将其直接消费到 MinIO 中的端到端实现。 这是学习如何结合使用 MinIO 和 Kafka 构建流式数据存储库的良好开端。 但是等等,还有更多。
在我的下一篇文章中,我将解释并向您展示如何学习本教程并将其变成更高效、更高效的东西。
使用 Kafka 和 MinIO 实现流媒体成功 这篇博文向您展示了如何开始构建流式数据湖。 当然,从开始到制作之间还有很多步骤。
MinIO 是云原生对象存储,它构成了ML/AI 、 分析 、 流视频 和其他在 Kubernetes 中运行的要求苛刻的工作负载 的基础。 MinIO 可无缝扩展 ,确保您可以简单地扩展存储以适应不断增长的数据湖。
客户经常使用 MinIO 构建数据湖,并将它们暴露给各种用于商业智能、仪表盘和其他分析的云原生应用程序。 他们使用Apache Iceberg 、 Apache Hudi 和 Delta Lake 构建它们 。 他们使用 Snowflake 、 SQL Server 或各种数据库来读取保存在 MinIO 中作为外部表的数据。 他们使用 Dremio 、 Apache Druid 和 Clickhouse 进行分析,使用 Kubeflow 和 Tensorflow 进行机器学习。
MinIO 甚至可以在云之间 复制 数据以利用特定的应用程序和框架,同时使用 访问控制 、 版本控制 、 加密 和 擦除编码 对其进行保护。
不过,请不要相信我们的话——您自己构建吧。 您可以 下载 MinIO,也可以加入我们的 Slack 频道 。