使用 Kafka Schema Registry 和 MinIO 充分利用流媒体

使用 Kafka Schema Registry 和 MinIO 充分利用流媒体

由流数据提供的现代数据湖/湖屋是许多寻求控制数据并将其应用于解决业务问题的企业关注的焦点。数据驱动的业务需要对自己的昨天和今天有敏锐的了解,才能在明天蓬勃发展。云原生事件流的最先进技术是带有对象存储端点(例如 MinIO)的 Apache Kafka。

鉴于 MinIO 兼具可扩展性和性能,它是 Kafka 架构的理想补充。这些属性使架构师或开发人员能够在流数据湖/湖屋之上构建他们想要的任何内容,而 S3 API 意味着他们可以使用云原生分析和 AI/ML 框架来为其应用程序提供支持。MinIO 充分利用底层硬件(请参阅为您的 MinIO 部署选择最佳硬件)来提供尽可能最高的性能 - 我们在 GET 上对其进行了基准测试,测试结果为 325 GiB/s (349 GB/s),在 PUT 上仅使用 32 个现成 NVMe SSD 节点,测试结果为 165 GiB/s (177 GB/s)。纠删码使 MinIO 成为持久且有弹性的分布式对象存储解决方案,同时提供丰富的企业集成意味着您的数据湖存储可以无缝地融入现有基础设施。

Apache Kafka 是流数据架构的关键元素。从非常基本的意义上来说,Kafka 是一个分布式事件流平台,由称为代理的进程集合组成。生产者将事件发送到代理,并根据时间保留事件,从而允许消费者异步读取和处理事件。我们在如何在 Kubernetes 中设置 Kafka 并将数据流式传输到 MinIO 中进行了更详细的介绍,其中我们向您展示了如何开始使用 Kafka 连接器将事件直接流式传输到 MinIO。

该文章解释了流数据的最简单方法,以便您可以快速启动和运行,但重要的是要记住,对于涉及大量工作负载的生产用例,它可能不够高效和性能。前面提供的快速而肮脏的示例的一个缺点是它缺乏一种自动化的方法来验证数据、发展流模式和添加额外的下游消费者。这些在开发或测试环境中都不会成为问题,但它们在生产中提出了严峻的挑战。当您添加多个 Kafka 开发人员,每个开发人员都对流进行自己的更改时,这些问题会从小到大变得严重。结果可能相当严重:当 Kafka 主题更改架构时 - 添加新列,

在具有多个开发人员和众多最终用户的企业环境中,一个主题可能很容易破裂。当某个主题发生故障时,数据就会停止流入数据湖。团队必须停止正在做的事情并开始排除故障。他们必须弄清楚为什么编写原始模式以及后续的更改破坏了它,然后手动更新模式。同样,这在开发/测试中没问题,但在生产环境中完全不可接受。

问题变成了 - 生产级架构是什么样子才能确保不存在与 Kafka 主题相关的流破坏性更改?这是本文其余部分的重点。

Kafka 模式注册表来救援

Kafka Schema Registry是 Apache Kafka 生态系统中的一个组件,为 Kafka 生产者和消费者提供集中式模式管理服务。它允许生产者为其生成的数据注册模式,并且消费者可以检索并使用这些模式进行数据验证和反序列化。架构注册表有助于确保通过 Kafka 交换的数据符合预定义的架构,从而实现不同系统和应用程序之间的数据一致性、兼容性和演进。

使用 Avro 或其他模式格式时,管理模式并深思熟虑地发展它们至关重要。通过对每个模式进行版本控制并将新模式与以前的版本进行比较,在 Kafka Schema 注册表中启用模式兼容性检查。所需的兼容性类型(向后、向前、完全、无等)决定了 Kafka 模式注册表如何评估每个新模式。未通过兼容性检查的新架构将从服务中删除。

使用 Kafka Schema Registry 的一些主要好处包括:

  • 模式演变:随着数据格式和需求随着时间的推移而发展,生产者和消费者的数据模式发生变化是很常见的。Kafka Schema Registry 提供对 schema 演化的支持,允许生产者注册新版本的 schema,同时保持与现有消费者的兼容性。消费者可以检索适当的架构版本进行反序列化,确保即使发生架构更改也能正确处理数据。

  • 数据验证:Kafka Schema 注册表允许生产者使用预定义的数据类型、字段名称和其他约束注册模式,从而实现数据验证。然后,消费者可以检索并使用这些模式来验证传入的数据,确保数据符合预期的结构和格式。这有助于防止数据处理错误并提高数据质量。

  • 模式管理:Kafka 模式注册表提供了一个用于管理模式的集中存储库,使跟踪、版本控制和管理更改变得更加容易。生产者和消费者可以通过简单的 API 注册、检索和管理模式,从而实现集中模式治理和管理。

  • 互操作性:Kafka Schema Registry 通过提供定义和管理数据模式的标准化方法来促进不同生产者和消费者之间的互操作性。使用不同编程语言编写或使用不同序列化框架的生产者和消费者可以使用通用的模式注册表来确保整个生态系统中的数据一致性和兼容性。

  • 向后和向前兼容性:Kafka Schema 注册表允许生产者注册向后和向前兼容的模式,从而能够在不中断现有生产者和消费者的情况下平滑升级和更改数据模式。向后兼容性确保较旧的使用者仍然可以处理使用较新模式生成的数据,而向前兼容性允许较新的使用者处理使用较旧模式生成的数据。

Strimzi Operator 尚未附带架构注册表,因此我们将使用 Confluence Helm 存储库中提供的架构注册表。

在这篇博文中,我们将执行以下操作

  1. 使用 Helm 图表设置 Kafka 架构注册表

  2. 创建并部署使用 Apache Avro 架构并发送事件的示例生成器

  3. 构建具有 Avro 依赖项的 KafkaConnect 容器

  4. 使用上述容器部署KafkaConnect

  5. 部署一个 Kafka 连接器,从 Kafka 模式注册表读取模式,使用来自生产者的主题事件,并将数据以 Parquet 格式存储到 MinIO 中

设置架构注册表

我们将使用以下命令克隆 Confluence Helm 存储库

In [ ]:
!git clone https://github.com/confluentinc/cp-helm-charts.git

In [ ]:
#move to schema registry folder
%cd cp-helm-charts/charts/cp-schema-registry

使用以下命令使用 Helm 图表安装 Schema Registry,我们需要提供我们部署的现有 Kafka 集群的引导服务器端点才能成功安装

In [ ]:
!helm install kafka-schema-registry --set kafka.bootstrapServers="PLAINTEXT://my-kafka-cluster-kafka-bootstrap:9092" . -n kafka

您可以通过检查日志来检查架构注册表是否已启动并正在运行,如下所示

!kubectl -n kafka logs -f --selector=app=cp-schema-registry -c cp-schema-registry-server # stop this shell once you are done

Apr 12, 2023 4:52:25 PM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
…
[2023-04-12 16:52:28,481] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain)

创建 Avro 主题

接下来,我们将为 Kafka 主题创建一个 YAML 文件nyc-avro-topic并应用它。您可以在此处找到示例代码。

使用 Avro 架构的生产者

我们将创建一个简单的 Python 生产者,然后向 Kafka 模式注册表注册 Avro 模式并发送 Kafka 主题事件。这将基于我们在上一篇博客文章中已有的制作人。您可以在此处找到示例代码。

然后,添加需求和我们将在其上构建 Docker 映像(代码)的 Dockerfile 。

使用上述 docker 文件构建生产者的 docker 镜像并将其推送到 docker 注册表中,或者您可以使用 openlake openlake/kafka-demo-avro- Producer中提供的镜像。

让我们创建一个 YAML,将生产者作为作业部署在 Kubernetes 集群中(代码

部署avro-producer.yaml文件

In [9]:
!kubectl apply -f deployment/avro-producer.yaml

job.batch/avro-producer-job created

您可以使用以下命令检查日志

In [14]:
!kubectl logs -f job.batch/avro-producer-job -n kafka # stop this shell once you are done

Error from server (NotFound): jobs.batch "avro-producer-job" not found

构建 Kafka Connect 镜像

让我们构建一个具有 S3 和 Avro 依赖项的 Kafka Connect 映像(代码

使用上述 Dockerfile 为生产者构建 Docker 镜像并将其推送到 Docker 注册表中,或者您也可以使用MinIO Openlake 存储库中提供的镜像。

在部署之前KafkaConnect,我们首先需要创建存储主题(如果尚未存在),以便KafkaConnect按预期工作。

部署卡夫卡连接

使用上述镜像为 Kafka Connect 创建一个 YAML 文件,并将其部署在 Kubernetes 中。KafkaConnect 将有 1 个副本并利用我们在上一篇博客文章中创建的存储主题。您可以在此处找到示例代码。

注意:spec.template.connectContainer.env已定义凭据,以便 KafkaConnect 在我们的 Minio 集群中存储数据。其他详细信息,例如endpoint_urlbucket_name将成为 的一部分KafkaConnector.key.convertervalue.converter指向 AvroConverter ( io.confluent.connect.avro.AvroConverter)

!kubectl apply -f deployment/avro-connect.yaml

kafkaconnect.kafka.strimzi.io/avro-connect-cluster created

部署 Kafka Sink 连接器

现在我们已经启动并运行了 Kafka Connect,下一步是部署接收器连接器,该连接器将轮询nyc-avro-topic数据并将数据以openlake-tmpParquet 格式存储到 MinIO 存储桶中。我们来看看配置

connector.class- 指定接收器连接器将使用什么类型的连接器。在我们的例子中是io.confluent.connect.s3.S3SinkConnector

store.url- 要存储来自 KafkaConnect 的数据的 MinIO 端点 URL

storage.class- 指定要使用的存储类别。在我们的例子中,由于我们存储在 MinIO 中,io.confluent.connect.s3.storage.S3Storage因此将使用

format.class- 数据将存储到 MinIO 中的格式类型,因为我们想存储 Parquet,所以我们将使用io.confluent.connect.s3.format.parquet.ParquetFormat实现

value.converter- 由于我们想要将二进制数据转换为 Avro,我们将使用io.confluent.connect.avro.AvroConverter

parquet.codec- 指定我们想要对 Parquet 文件使用什么类型的压缩,在我们的例子中我们将使用snappy

schema.registry.url- 指定连接器可以从中拉取、验证架构并反序列化来自生产者的数据的端点

您可以在此处找到示例代码,然后可以应用该代码。

!kubectl apply -f deployment/avro-connector.yaml

kafkaconnector.kafka.strimzi.io/avro-connector created

如果一切顺利,我们很快就会看到通过执行以下命令将文件添加到 Minio openlake-tmp 存储桶中

!mc ls --summarize --recursive play/openlake-tmp/nyc-taxis-avro/nyc-avro-topic/

与我们在如何在 Kubernetes 中设置 Kafka 和将数据流式传输到 MinIO 中的先前基本设置相比,我们当前的设置明显更快、更稳健、存储效率更高。您可以尝试运行生产者和连接器以查看性能和内存利用率差异。

我们现在拥有一个端到端设置,可以使用 Avro 模式高效生成数据 Kafka 主题,并将其直接以 Parquet 格式使用到 MinIO 中。

实验:Iceberg

最近,Iceberg 连接器支持已由 Kafka 添加getindata,您可以在存储库getindata/kafka-connect-iceberg-sink中找到它。下面我们将探讨如何将nyc-avro-topic数据作为 Iceberg 表直接存储到 MinIO 中。根据我们的测试,我们认为这仍处于实验阶段,尚未准备好投入生产,但请尝试一下以了解未来。

Iceberg  Kafka连接

让我们创建一个具有 Iceberg 依赖项的 KafkaConnect。确保在部署之前编辑spec.config.build.output.imagespec.config.build.output.pushSecret指向您的 Docker 注册表。(代码

然后部署我们新的 KafkaConnect CRD

!kubectl apply -f deployment/iceberg-connect.yaml

kafkaconnect.kafka.strimzi.io/iceberg-connect-cluster created

部署 Iceberg Sink 连接器

现在我们已经部署了 Iceberg KafkaConnect,让我们部署 KafkaConnector,它将直接将 Iceberg 表存储到 MinIO 中。Iceberg 支持不同的目录类型,因此您需要选择满足您需求的目录。共有三个目录可用作连接器,您将在下面找到示例配置:

Hadoop Iceberg 接收器连接器

示例展示了如何使用Hadoop catalogMinIO 创建和维护 Iceberg 表。

Hive Iceberg 水槽连接器

示例展示了如何使用Hive catalogMinIO 创建和维护 Iceberg 表。

注意:iceberg.uriiceberg.catalog-impliceberg.table-default.write.data.pathiceberg.table-default.write.metadata.path是 Iceberg Hive 目录工作所必需的。

Nessie Iceberg Sink连接器

示例展示了如何使用Nessie catalogMinIO 创建和维护 Iceberg 表。

注意:iceberg.uriiceberg.reficeberg.catalog-impl是使 Iceberg Nessie 目录与 MinIO 配合使用所需的关键更改。

使用以下任意命令将 KafkaConnector 与您选择的 Iceberg Catalog 一起部署,默认情况下 Hadoop 目录已在下面启用

!kubectl apply -f deployment/iceberg-hadoop-connector.yaml
# !kubectl apply -f deployment/iceberg-hive-connector.yaml
# !kubectl apply -f deployment/iceberg-nessie-connector.yaml

Kafka 和 Iceberg 与 MinIO

这篇博文向您展示了如何构建生产级的端到端架构,以将数据作为 Iceberg 表直接从 Kafka 流式传输到 MinIO。如前所述,Kafka 的 Iceberg 连接器是实验性的,根据我们的初步实验,它尚未准备好投入生产;随着积极的开发正在进行,这种情况可能很快就会改变。如果您已经设置了 Spark,并且想要一个用于在 MinIO 中存储 Iceberg 表的生产就绪解决方案,您可以探索Spark Streaming

Kafka 和 MinIO 都是软件定义的,为流数据湖提供了便携式多云解决方案。它们可以在任何地方运行——本地、公共/私有云和边缘——为事件流架构提供基础,支持云原生分析和人工智能/机器学习应用程序,无论它们位于何处。现在,您可以随时随地自由地构建任何您想要的东西。

立即下载 MinIO并开始构建您的云原生数据湖。


上一篇 下一篇