使用 Kafka 和 MinIO 进行 Spark 结构化流处理
Apache Kafka 是领先的开源分布式事件流平台,用于构建数据管道、流分析、数据集成和应用程序。企业喜爱Kafka是因为它的高可用性、高吞吐量和可扩展性。他们还喜欢使用 Apache Spark 来处理数据和构建分析,因为它快速、分布式且具有容错能力。Spark 经过多年的发展,添加了 Spark SQL 等功能,Spark SQL 是一个使用关系查询进行结构化数据处理的模块。Spark Structured Streaming 构建于 Spark SQL API 之上,用于数据流处理。将 Kafka 与 Spark Structured Streaming 相结合,使开发人员能够像在静态数据上编写批量计算一样表达流计算。
Kafka 和 Spark Structured Streaming 一起使用来构建由流数据提供的数据湖/湖屋,并提供实时业务洞察。为数据湖提供数据只是等式的一部分——为了充分利用数据湖,底层对象存储必须具有高可用性、高性能、可扩展性和 API 驱动性。
MinIO 是流数据湖的绝佳方案。行业领先的 S3 API 兼容性让开发人员充满信心,他们可以毫无问题地使用自己的定制软件、云原生分析或 AI/ML。MinIO 充分利用底层硬件(请参阅为您的 MinIO 部署选择最佳硬件)来提供尽可能最高的性能 - 我们在 GET 上对其进行了基准测试,测试结果为 325 GiB/s (349 GB/s),在 PUT 上仅使用 32 个现成 NVMe SSD 节点,测试结果为 165 GiB/s (177 GB/s)。
Spark Streaming
Apache Spark Streaming 是一个功能强大且可扩展的流处理框架,是更大的 Apache Spark 生态系统的一部分。它提供了一个高级且富有表现力的 API,用于处理来自各种来源(例如 Kafka、Flume、Kinesis 或自定义来源)的数据流,并支持广泛的用例,包括实时分析、机器学习、欺诈检测等。Spark Streaming 遵循微批处理模型,其中传入数据被分为基于时间的小批次,并行处理,并将结果聚合以生成最终输出。Spark Streaming 提供了容错性、可靠性和一次性处理语义的有力保证,使其成为构建可扩展且健壮的流处理应用程序的流行选择。
Spark 结构化流
Spark Structured Streaming 是 Apache Spark 生态系统的新增成员,它基于“连续处理”的概念提供了更先进、更统一的流处理 API。Spark Structured Streaming 扩展了熟悉的 DataFrame 和 Dataset API(用于 Spark 中的批处理),也无缝支持流数据的处理。它为处理数据流提供了更高级别的抽象,允许开发人员编写类似于批处理代码的流处理代码,使其更加直观和用户友好。Spark 结构化流提供了先进的功能,例如对容错、事件时间处理和状态管理的内置支持,使其成为构建可扩展、可靠和复杂的流处理应用程序的强大且便捷的选择。
结构化流是满足现代流处理需求的方法,因为
它是一个真正的连续处理流模型,而不是 Spark Streaming 的微批量模型。
它通过利用 Spark SQL 拥有更丰富的 API 和一组流处理功能。
具有更强的容错性和一致性保证。通过检查点和恢复来防止数据丢失。
它支持基于事件时间的处理和跨时间数据的推理。
与较低级别的 DStreams API 相比,该 API 级别更高且更易于使用。
结构化流是 Spark 流处理的未来,并且正在持续投资和改进。因此,对于任何新的流应用程序,强烈建议从结构化流开始。
在这篇博文中,我们将探讨如何使用 Spark 结构化流处理从 Kafka 流式传输的事件。我们还将探索MinIO 检查点并展示它所提供的显着性能改进。我们还将探讨如何将 Kafka 中的事件数据直接作为 Iceberg Table 保存到 MinIO 中。
如果您还没有设置 Kafka,请查看这篇博文。如果您尚未使用 Kafka 主题nyc-avro-topic并生成事件avro-producer,请参阅使用 Kafka Schema Registry 和 MinIO 充分利用流式处理。
PySpark 结构化流应用程序
下面我们将编写一个简单的 PySpark 应用程序,它将持续nyc-avro-topic从 Kafka 流式传输该主题的事件并处理每条记录并将其作为Parquet文件保存到 MinIO 中。
注意:我们假设 Kafka、Kafka 架构注册表、Kafka 主题nyc-avro-topic和 Avro 生产者已启动并运行。您可以参阅利用 Kafka Schema Registry 和 MinIO 充分利用流媒体以获取详细说明。
您可以在此处找到示例代码。
在上面的代码中:
load_config拥有 Spark 与 MinIO 连接读写数据所需的所有 Hadoop 配置。
value_schema_dictSpark 将使用 Avro 架构来反序列化来自 Kafka 的数据。
配置 Spark Stream
让我们更深入地了解如何配置 Spark Stream:
stream_df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092") \
.option("subscribe", "nyc-avro-topic") \
.option("startingOffsets", "earliest") \
.option("failOnDataLoss", "false") \
.option("mode", "PERMISSIVE") \
.option("truncate", False) \
.option("newRows", 100000) \
.load()在上面的代码片段中,我们将 Spark Stream 格式指定为kafka,我们包含的一些关键选项是:
kafka.bootstrap.servers- 消费事件的 Kafka 服务器端点
subscribe- 指向 Spark 流将处理的主题的逗号分隔字符串列表。在我们的例子中,它是nyc-avro-topic。
startingOffsets- 指定 Spark Stream 从何时开始处理订阅主题的事件。在我们的例子中,它将是earliest,这是订阅主题的时间开始
预处理 Avro 数据流
| # special pre-processing to ignore first 6 bits for Confluent Avro data stream topics taxi_df = stream_df.selectExpr("substring(value, 6) as avro_value").select( from_avro("avro_value", json.dumps(value_schema_dict)).alias("data")).select("data.*") |
在我们的例子中,由于我们使用的是 Confluence Avro 数据流,因此在开始使用 Spark 中的数据之前,我们需要预处理字节流。前 6 位包含 Avro 模式元数据,消费者/连接器将使用 Kafka 中的 Confluence API 使用该元数据。在我们的例子中,我们可以跳过 Spark Streaming 的这些位
将 Parquet 数据写入 MinIO
taxi_df.writeStream \ .format( "parquet" ) \ .outputMode( "append" ) \ .option( "path" , "s3a://warehouse/k8/spark-stream/" ) \ .option( "checkpointLocation" , "s3a://warehouse/k8/checkpoint" ) \ .start() \ .awaitTermination() |
taxi_df我们在 Spark 中打开一个写入流,将预处理后的数据帧parquet写入path选项中指定的 MinIO 中,我们还给出了checkpointLocation一个 MinIO 存储桶,Spark 将在其中不断创建检查点。如果作业失败,Spark 将根据检查点从中断处继续。
| %%writefile sample-code/Dockerfile FROM openlake/spark-py:3.3.2 USER root WORKDIR /app RUN pip3 install pyspark==3.3.2 # Add avro dependency ADD https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.1/commons-pool2-2.11.1.jar $SPARK_HOME/jars ADD https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.3.2/spark-avro_2.12-3.3.2.jar $SPARK_HOME/jars COPY src/*.py . |
构建您自己的映像或使用openlake/sparkjob-demo:3.3.2具有上述代码的 docker hub。
您可以在此处找到示例代码。
应用它
| !kubectl apply -f sample-code/spark-job/sparkjob-streaming.yaml |
注意:由于我们的 Kafka Consumer 大约运行 3 小时来传输所有112M行,因此当生产者和消费者同时启动时,Spark 结构化流也将花费接近相同的时间。
性能基准
我们单独测量了 Spark 结构化流消费者进行的 S3 API 调用数量,并记录了以下结果
API RX TX CALLS ERRORS s3.CopyObject 6.7 MiB 4.5 MiB 20220 0 s3.DeleteMultipleObjects 11 MiB 3.0 MiB 26938 0 s3.DeleteObject 9.9 MiB 0 B 39922 0 s3.GetObject 1.8 MiB 496 MiB 6736 0 s3.HeadObject 84 MiB 0 B 336680 0 s3.ListObjectsV2 60 MiB 1.6 GiB 241903 0 s3.PutObject 2.3 GiB 0 B 26975 0 Summary: Total: 699374 CALLS, 2.5 GiB RX, 2.1 GiB TX - in 11999.80s
从上表中可以看出,我们对 MinIO 端点进行了大约 700K 次调用。我们可以对消费者代码进行一个简单的更改来优化它。如果我们1 min向消费者代码添加延迟触发器,我们可以显着减少 API 调用总数,而不是不断轮询来自 Kafka 的新事件。这是优化后的代码。
| taxi_df.writeStream \ .format("parquet") \ .outputMode("append") \ .trigger(processingTime='1 minute') \ .option("path", "s3a://warehouse-v/k8/spark-stream/") \ .option("checkpointLocation", "s3a://warehouse-v/k8/checkpoint") \ .start() \ .awaitTermination() |
这里要注意的关键是它会在每次轮询 Kafka 事件之前.trigger(processingTime='1 minute')添加一个。1 min这是经过优化的代码的数字
API RX TX CALLS ERRORS s3.CopyObject 207 KiB 139 KiB 614 0 s3.DeleteMultipleObjects 335 KiB 92 KiB 812 0 s3.DeleteObject 235 KiB 0 B 921 0 s3.GetObject 54 KiB 469 KiB 199 0 s3.HeadObject 2.5 MiB 0 B 9867 0 s3.ListObjectsV2 1.7 MiB 12 MiB 6910 0 s3.PutObject 2.0 GiB 0 B 814 0 Summary: Total: 20137 CALLS, 2.0 GiB RX, 13 MiB TX - in 12126.59s
从上表可以看出,我们从~700KAPI 调用转变为~20kAPI 调用。只需添加简单的 1 行代码更改,我们就可以对 S3 API 调用的数量进行重大更改。
MinIO 检查点管理器
上述优化是一个巨大的改进。如果我们在版本化存储桶中运行相同的代码集,并且在所有行都存储到 MinIO 中之后,如果我们执行 a,mc ls --versions --recursive opl/warehouse-v/k8 --summarize您仍然会注意到带有v1和v2的对象v2是应该删除的对象的删除标记。随着消费者不断添加记录、删除标记对象,会积累并浪费存储空间,随着时间的推移,这可能会产生问题。
输入MinIO Checkpoint Manager,io.minio.spark.checkpoint.S3BasedCheckpointFileManager它利用 MinIO 严格一致的原子事务。MinIO 的检查点管理器充分利用了本机对象 API,并避免了基于 POSIX 的实现带来的不必要的负担。
通过将这 1 行添加到 Spark 配置中,您可以轻松地在代码中使用检查点管理器
| SparkConf().set("spark.sql.streaming.checkpointFileManagerClass", "io.minio.spark.checkpoint.S3BasedCheckpointFileManager") |
这是示例代码,您可以运行它来查看结果
构建您自己的映像或使用 Docker Hub 中的映像openlake/sparkjob-demo:3.3.2,其中包含上述代码。您可以在此处找到代码。
使用新的 MinIO 的检查点管理器部署优化的消费者,如下所示
!kubectl apply -f 示例代码/spark-job/sparkjob-streaming-optimized.yaml
性能基准
我们使用优化的检查点管理器测量了 Spark 结构化流消费者进行的 S3 API 调用数量,并记录了以下结果
API RX TX CALLS ERRORS s3.DeleteMultipleObjects 5.8 MiB 1.7 MiB 15801 0 s3.DeleteObject 12 MiB 0 B 46465 0 s3.GetObject 4.0 MiB 2.7 GiB 15802 0 s3.HeadObject 43 MiB 0 B 172825 0 s3.ListObjectsV1 7.8 MiB 7.8 GiB 31402 0 s3.ListObjectsV2 3.9 MiB 5.2 MiB 15782 0 s3.PutObject 4.7 GiB 0 B 63204 0 Summary: Total: 361281 CALLS, 4.8 GiB RX, 10 GiB TX - in 12160.25s
我们已经可以看到我们从~700KAPI 调用转变为~361KAPI 调用,除此之外,如果我们1 min在每次轮询之前添加延迟,我们将看到进一步的改进
API RX TX CALLS ERRORS s3.DeleteMultipleObjects 75 KiB 23 KiB 200 0 s3.DeleteObject 100 KiB 0 B 394 0 s3.GetObject 52 KiB 469 KiB 199 0 s3.HeadObject 508 KiB 0 B 1995 0 s3.ListBuckets 150 B 254 B 1 0 s3.ListObjectsV1 75 KiB 2.8 MiB 293 0 s3.ListObjectsV2 51 KiB 67 KiB 200 0 s3.PutObject 2.0 GiB 0 B 803 0 Summary: Total: 4085 CALLS, 2.0 GiB RX, 3.3 MiB TX - in 11945.35s
从~20K之前的 API 调用,我们现在开始进行~4KAPI 调用。我们会注意到的另一个重大改进versioned bucket是不v2 delete marker存在任何对象,而我们只有v1对象。
Spark 结构化流和数据湖
在这篇博文中,我们了解了如何使用 Spark 结构化流处理来自 Kafka 的事件,并且还深入研究了一些可以减少 S3 API 调用数量的优化。我们还看到了使用 MinIO 检查点管理器的好处,以及该实现如何避免所有 POSIX 包袱并利用对象存储的本机严格一致性。在下一篇博文中,我们将了解如何在 Spark 结构化流中实现端到端 Kafka 生产者和消费者,以及如何加快整个流程。
Kafka、Spark 和 MinIO 经常结合起来构建数据湖和分析。它们全部由软件定义,为流数据提供便携式多云主页,使您能够随时随地为分析和 AI/ML 应用程序提供数据。