使用 R 的 MinIO 和 Apache Arrow

使用 R 的 MinIO 和 Apache Arrow

这里的箭,那里的箭,到处都是箭。似乎目前你不能在没有点击关于 Apache Arrow 的文章或博客文章的情况下摆动一只死猫。大多数似乎都是面向开发人员的,并且基于 Python 和 Spark 风格的开发平台。今天我要写一篇关于使用 R 编程语言的 Apache Arrow 和 MinIO 的文章。

如果您不熟悉R(来自维基百科):

“R 是一种用于统计计算和图形的编程语言,由 R 核心团队和 R 统计计算基金会提供支持。R 由统计学家 Ross Ihaka 和 Robert Gentleman 创建,被数据挖掘者和统计学家用于数据分析和开发统计软件。用户创建了包来增强 R 语言的功能。”

当我处理数据、数据分析和机器学习时,我发现 R 对我来说是最直观的。

MinIO是高性能软件定义的 S3 兼容对象存储,使其成为 Amazon S3 的强大而灵活的替代品。

最后,如果您不熟悉Apache Arrow(来自网站):

“Apache Arrow 为平面和分层数据定义了一种独立于语言的列式内存格式,组织起来可以在 CPU 和 GPU 等现代硬件上进行高效的分析操作。”

Arrow、R 和 MinIO 的组合形成了一个非常强大的平台,用于对大型数据集进行数据分析。我将使用一个500 万行的 csv 文件,其中包含从“E for Excel”随机生成的销售数据

请注意此数据是虚构的。E for Excel 突出说明:“免责声明——数据集是通过 VBA 中的随机逻辑生成的。这些不是真实的销售数据,不应该用于测试以外的任何其他目的。”

我选择这个数据集是因为它的结构相当简单,而且足够大,可以捕获分析处理的一些时间指标。

R 语言有一个名为RStudio的配套 IDE ,我将使用它进行此开发。如果你想跟随,请安装 R 和 RStudio,并访问 MinIO 集群。如果您还没有运行 MinIO 和mcMinIO 客户端,请下载并安装它们。

我首先将销售数据下载到我机器上的本地目录,然后将其复制到 MinIO 上的存储桶中。可以使用 MinIO 命令行工具将文件复制到 MinIO mc,或者从 MinIO 控制台上传文件。我创建了一个名为的存储桶large-csv,并5m Sales Records.csv从控制台将文件上传到它。这是我们要做的工作的出发点。


pasted image 0 (94).png


网上有许多资源可用于从 R 中使用 MinIO,以及从 R 中使用 Apache Arrow。我发现以下两个链接对将这些东西放在一起很有用:

现在开始编码。正如我提到的,我在 R studio 中使用 R 工作。R studio 是使用 R 的常用 IDE,如下所示:


pasted image 0 (95).png


我将分段粘贴本文中的代码。

这是一个开始。它是设置环境并将所需库加载到 R 中的代码。请注意使用环境来传递凭据和端点。有多种方法可以使这些值对图书馆可用,包括与大多数 IAM 系统的集成。出于演示的目的,这似乎是最简单的:

# set the credentials and endpoint this r instances uses to access my minio cluster
Sys.setenv("AWS_ACCESS_KEY_ID" = "minioadmin", # enter your credentials
"AWS_SECRET_ACCESS_KEY" = "minioadmin", # enter your credentials
"AWS_S3_ENDPOINT" = "HP-Z230:9000")  # change it to your specific minio IP and port to override default aws s3
# load the library
library(minio.s3)
library(arrow)
library(dplyr)
#library(aws.s3) This works as well

接下来,我包含了从 MinIO 中的存储桶中读取大型 csv 文件所需的代码。我在下面所做的要求整个 csv 适合此客户端的内存。如果您的 csv 文件太大而无法放入内存,则有几种方法可以在循环中以较小的块处理文件。下面的代码获取桶的句柄并读取对象。

# load the large csv file
b <- get_bucket(bucket = 'large-csv', use_https = F)
ob <- aws.s3::s3read_using(FUN = read.csv, object = "5m Sales Records.csv", bucket = b, opts = list(use_https = FALSE, region = ""))

一旦我们在一个对象中有了 csv 的表格数据,我们将把它以 Arrow 格式写入另一个桶。Arrow 库喜欢 URL,所以我创建了几个函数来简化 URL 的构建:

# these functions like URLs
# get minio config, with expected defaults
minio_key <- Sys.getenv("MINIO_ACCESS_KEY", "minioadmin")
minio_secret <- Sys.getenv("MINIO_SECRET_KEY", "minioadmin")
minio_host <- Sys.getenv("MINIO_HOST", "hp-z230")
minio_port <- Sys.getenv("MINIO_PORT", "9000")
minio_arrow_bucket <- Sys.getenv("MINIO_ARROW_BUCKET", "arrow-bucket")
# helper function for minio URIs
minio_path <- function(...) paste(minio_arrow_bucket, ..., sep = "/")
minio_uri <- function(...) {
template <- "s3://%s:%s@%s?scheme=http&endpoint_override=%s%s%s"
sprintf(template, minio_key, minio_secret, minio_path(...), minio_host, ":", minio_port)
}

Arrow 格式的优点是可以针对随后将执行的预期查询优化数据。双绞线group_by用于设置分区方案。分区就像在数据库上设置有用的索引一样,需要了解如何查询或使用数据。分区会带来成本和收益。需要做出权衡,如果了解最常查询数据的方式,那么权衡可能是有益的。对于此示例,我将假设针对给定国家/地区的销售数据进行了分析。另一方面,如果数据最常按不同国家/地区的项目类型进行分析,则此分区方案可能无济于事。尝试按每一列进行分区(以防万一)会破坏分区的好处。以下是对指南的解释,来自一篇关于将 Arrow 与 R 结合使用的文章(https://arrow.apache.org/docs/r/articles/dataset.html):

分区性能注意事项
分区数据集有两个影响性能的方面:它增加了文件的数量,并围绕文件创建了一个目录结构。这两者都有好处和成本。根据数据集的配置和大小,成本可能会超过收益。
由于分区将数据集拆分为多个文件,因此可以并行读取和写入分区数据集。但是,每个额外的文件都会在处理文件系统交互时增加一点开销。它还增加了整体数据集的大小,因为每个文件都有一些共享的元数据。例如,每个镶木地板文件都包含模式和组级统计信息。分区数是文件数的底数。如果按日期对包含一年数据的数据集进行分区,则至少会有 365 个文件。如果按具有 1,000 个唯一值的另一个维度进一步分区,您将拥有多达 365,000 个文件。这种分区通常会生成主要由元数据组成的小文件。
分区数据集创建嵌套文件夹结构,这些结构允许我们修剪在扫描中加载的文件。然而,这增加了在数据集中发现文件的开销,因为我们需要递归地“列出目录”来查找数据文件。
分区太细可能会导致问题:按日期对数据集分区一年的数据将需要 365 列表调用才能找到所有文件;添加另一个基数为 1,000 的列将进行 365,365 次调用。
最佳分区布局将取决于您的数据、访问模式以及将读取数据的系统。大多数系统,包括 Arrow,应该适用于各种文件大小和分区布局,但您应该避免一些极端情况。这些指南有助于避免一些已知的最坏情况:
避免使用小于 20MB 和大于 2GB 的文件。
避免使用超过 10,000 个不同分区的分区布局。
对于在文件中具有组概念的文件格式,例如 Parquet,适用类似的准则。行组可以在读取时提供并行性并允许基于统计数据跳过数据,但是非常小的组可能导致元数据成为文件大小的重要部分。在大多数情况下,Arrow 的文件编写器为组大小调整提供了合理的默认值。”

鉴于我假设数据将在一个国家/地区内进行分析,这是将 Arrow 格式数据集写入 MinIO 中不同存储桶的代码。

# partition by the "Country" column - we typically analyze by country
ob %>%
group_by(Country) %>%
write_dataset(minio_uri("sales-data"), format = "arrow" )

如果我们查看结果,我们会看到创建了多个文件来支持分区。


pasted image 0 (96).png


如果我们查看箭头桶的内容,我们会看到数据已按照预期按国家/地区进行分区:


pasted image 0 (97).png


这就是读取 .csv 文件并将数据集以 Arrow 格式存储在 MinIO 上的所有代码。接下来我们将打开这个数据集并使用一些基本的数据选择和聚合来查询数据。

我们经历这些循环的原因是,通过在查询中最常使用的列上对数据集进行分区,我们可以减少每次查询从 MinIO 读取的数据,从而加快查询速度。

此外,Arrow 在 R 和 Dplyr 中的集成通过在执行之前首先分级处理来提高性能,以避免在每个步骤之间创建中间存储。这可以节省时间并减少内存使用,从而有可能避免处理过程中内存不足的情况。这种方法还尽可能地降低(或完全避免)处理。根据对数据集进行分区的属性选择数据子集会显着减少必须处理的数据。如果数据按“国家/地区”分区,并且我们选择了给定的国家/地区,则忽略所有其他文件及其包含的数据。

此外,Arrow 文件可以包含元数据,如果查询将处理限制在该文件中的数据范围内,则该元数据允许仅检索文件的一个子集。

mutate() / transmute()select() / rename() / relocate()filter()group_by()arrange()的查询步骤会记录它们的操作,但在您运行之前不会对数据进行评估收集()推迟这些步骤允许查询在不创建中间数据集的情况下查明数据的一小部分。

为了使用或查询数据集,我们首先必须打开它。在下面的评论中,我们展示了内存数据结构“ds”包含的内容。“Ds”实际上是对元数据的引用——还没有数据被加载到内存中。这种处理元数据并延迟或下推数据检索的方法允许处理比内存所能容纳的数据集大得多的数据集。

##################
# Querying the sales order data
# open the dataset
ds <- open_dataset(minio_uri("sales-data"), format = "arrow" )
# here is what it contains:
# > ds
# FileSystemDataset with 186 Feather files
# Region: string
# Item.Type: string
# Sales.Channel: string
# Order.Priority: string
# Order.Date: string
# Order.ID: int32
# Ship.Date: string
# Units.Sold: int32
# Unit.Price: double
# Unit.Cost: double
# Total.Revenue: double
# Total.Cost: double
# Total.Profit: double
# Country: string

接下来,我们能够对该数据集执行有用的查询。鉴于我加载的数据是销售数据,我将查询在给定国家/地区销售的不同商品类型的中位数利润,作为查询处理的示例。我还将计算在我的旧的、缓慢的 Mac 笔记本电脑上执行此查询所需的时间,它通过我的旧的、缓慢的千兆位网络访问我的旧的、缓慢的 MinIO 集群 :-)

# what is the median margin on the differing item types sold in Cyprus?
# what's the highest? how long does it take to query 5m rows?
system.time(ds %>%
filter(Country == "Cyprus") %>%
select(Unit.Price, Unit.Cost, Item.Type) %>%
mutate(margin = 100 * ((Unit.Price - Unit.Cost)/Unit.Cost)) %>%
group_by(Item.Type) %>%
collect() %>%
summarise(
median_margin = median(margin),
n = n()
) %>% arrange(desc(median_margin)) %>%
print()
)

以下是查询结果和时间。不错。

Item.Type      median_margin     n
<chr>                   <dbl> <int>
1 Clothes                 205.   2263
2 Cereal                   75.6  2268
3 Vegetables               69.4  2234
4 Cosmetics                66.0  2235
5 Baby Food                60.1  2271
6 Snacks                   56.6  2266
7 Beverages                49.3  2261
8 Personal Care            44.2  2265
9 Fruits                   34.8  2234
10 Household                33.0  2227
11 Office Supplies          24.0  2253
12 Meat                     15.7  2233
user  system elapsed
0.057   0.037   0.303

数据集没有“关闭”调用,因此我们完成了此处理。

概括

总而言之,在 R 中使用 Arrow 格式的数据集允许人们以一种有利于处理的方式基于列智能地划分数据。它还允许根据需要加载数据子集,因此能够处理比内存所能容纳的大得多的数据集。这些函数非常直观,正如您所见,不需要大量代码就可以对数据集执行相当复杂的查询。享受!


上一篇 下一篇