Apache Nifi 的 MinIO 事件通知

Apache Nifi 的 MinIO 事件通知


Apache Nifi是当今最流行的开源数据流引擎之一。Nifi支持几乎所有主要的企业数据系统,并允许用户创建有效,快速和可扩展的信息流系统。使用Nifi创建数据流系统非常简单,并且有一条明确的途径可以添加对尚未作为Nifi处理器提供的系统的支持。所有这些都推动了Nifi的大规模采用。

一些MinIO客户在其用例中利用了Nifi。这些客户利用MinIO获得高性能数据湖,通常会合成多个不同的数据集。Nifi允许那些客户将这些数据路由到相关的最终用户。

常见的模式之一是在Nifi中使用MinIO对象元数据创建自定义流。

特定的用例可能会有所不同,例如,某人可能想以不同的方式对待csvjson上传到MinIO的文件,其他人可能希望将其jpg,pngpdf文件分开,其他人可能只想将json文件转换为文件parquet并将其存储回MinIO等等。

在这篇文章中,我将解释如何设置Nifi来监听MinIO事件通知。然后,我们将看到如何json通过Nifi处理器解析MinIO事件然后,我们将从event中过滤用户定义的元数据头json最后,我们将根据event中是否存在标头来了解如何采取后续步骤json

先决条件

启动用于Webhook的Nifi处理器

我们将使用MinIO Webhook事件通知将Apache Nifi配置为事件目标。为此,请首先ListenHTTP在Nifi GUI中创建处理器。然后将其配置为侦听特定端口。请参阅下面的处理器详细信息:


ListenHTTP处理器属性


配置MinIO事件通知

创建处理器后,为刚创建的Webhook服务器配置MinIO事件通知。

mc mb myminio/source

mc admin config set myminio notify_webhook:nifi endpoint=http://localhost:8086/contentListener

mc admin service restart myminio

mc event add myminio/source arn:minio:sqs::nifi:webhook --event put

在这里,我们将MinIO配置为在存储桶中发生事件http://localhost:8086/contentListener向其发送通知putsource

NifiListenHTTP处理器正在等待http://localhost:8086/contentListener上一步中配置的事件

添加EvaluateJsonPath处理器

现在,MinIO和Nifi之间已经建立了通信,下一步就是使用EvaluateJsonPathNifi处理器。我们使用它来解析MinIO事件通知json有效负载,并确定它是否包含某个用户定义的元数据标头。

如果存在标题X-Amz-Meta-key1,则继续进行下一步,否则将流程放在此处。我们还获取对象和存储桶名称,以便可以将其传递到下一步。

这是数据流中的关键步骤。

在我的示例中,我查找header X-Amz-Meta-key1您可以在此处调整元数据字段以适合您的用例:


评估JsonPath处理器属性


最后步骤

如果EvaluateJsonPathNifi处理器找到了我们要查找的标头,则转到下一步。在此示例中,我选择从MinIO获取对象。我们使用FetchS3Object处理器来执行此操作。

当然,您可以根据实际用例在此处使用其他处理器。


FetchS3Object处理器属性


为了完成操作,如果通过对象访存,我们将文件保存在本地驱动器上,否则我们将记录一个错误。完整的流程如下所示:


完整的数据流


结论

客户不可避免地面临数据流挑战,而Apache Nifi已成为应对此类挑战的流行选择。在MinIO,我们越来越多地看到用Nifi作为数据流编排器来构建快速,可扩展和有效的管道的用例。

在本文中,我们看到了如何使用事件通知和内置的Nifi处理器构建基于MinIO和Nifi的数据流系统。我们看到了数据流如何检查事件通知有效负载并确定是否存在某些可用的标头。根据此过滤器事件的结果,我们添加了另一个步骤,以从MinIO获取对象并将其保存在本地。

自己尝试一下。如果您需要帮助,请查看我们的文档您也可以查看我们的公共Slack频道


上一篇 下一篇