深入挖掘冰山一角:表上的 ACID 事务

深入挖掘冰山一角:表上的 ACID 事务

Apache Iceberg 数据湖存储格式在保存到 MinIO 的表上启用 ACID 事务。ACID 事务使多个用户和服务能够同时可靠地以原子方式添加和删除记录。同时,查询被隔离以保持对正在被更改的表的读取一致性。您可以将 MinIO 和 Iceberg 与作为元数据数据库的 PostgreSQL 结合使用,使用 ACID 事务支持进行写入、删除、更新、时间旅行和模式修改。

关于所有这一切的好消息是,您可以继续使用您已经了解和喜爱的 SQL 和 DML。例如,您可以用新信息更新一行或删除一行以将其删除。甚至时间旅行也是一个 SELECT 语句。

将 Iceberg 表格式与 MinIO 相结合,创建了一个强大、灵活且可扩展的 lakehouse 平台。Iceberg表规范声明了一种表格式,旨在管理存储在分布式系统中的“大型、缓慢变化的文件或对象集合”。Iceberg 规范的版本 1 定义了使用不可变文件格式 Parquet、Avro 和 ORC 的大型分析表的管理。规范的第 2 版为具有不可变文件的分析表添加了行级更新和删除。

Iceberg 表格式跟踪表中的单个数据文件而不是目录。数据文件就地创建,文件仅显式添加到表中。表状态在元数据中维护。Iceberg 元数据和清单列表只是存储在 MinIO 中的对象,MinIO 还维护自己的与对象一起存储的元数据。对表状态的每次更改都需要创建一个新的 Iceberg 元数据文件,用原子交换替换旧的元数据。原子性由 MinIO 启用。

Iceberg 表格式要求:

  • 就地写入:文件/对象一旦写入就不会移动或更改

  • Seekable reads:数据文件格式需要seek支持

  • 删除:表删除不再需要的文件/对象(或者,在 MinIO 的情况下,对象被标记为已删除但保留)

这些要求与 MinIO 等对象存储兼容。一旦写入,数据和元数据文件在被删除之前是不可变的。MinIO 继续保存过时版本的数据和元数据对象,确保数据永远不会被删除,并扩展 Iceberg 的时间旅行功能。

在这篇博文中,我们将深入研究 Iceberg 的 ACID 事务,以创建一个 Iceberg 表,更新和删除其中的记录,并改进其架构。这篇文章是Iceberg 和 MinIO Lakehouse 架构权威指南的后续文章,其中包括对 Lakehouse 架构的解释,对 Spark、Iceberg、PostgreSQL 和 MinIO 一起工作的深入讨论,以及一个教读者如何安装它们的教程、创建表、演进表模式,以及如何使用时间旅行和回滚。

ACID 事务与 Iceberg 和 MinIO 教程

我正在使用UCI 机器学习存储库中托管的在线零售数据集。这是一个交易数据集,其中包含 2010 年 1 月 12 日至 2011 年 9 月 12 日期间发生在英国且已注册的非商店在线零售业务的所有交易。该数据集包含约 550,000 条记录,具有八个属性:

  • InvoiceNo:发票编号。标称,为每笔交易唯一分配的 6 位整数。如果此代码以字母“c”开头,则表示取消。

  • StockCode:商品(商品)代码。标称,为每个不同的产品唯一分配的 5 位整数。

  • 描述:产品(项目)名称。名义上的。

  • 数量:每笔交易每个产品(项目)的数量。数字。

  • InvoiceDate:发票日期和时间。数字,每笔交易产生的日期和时间。

  • UnitPrice:单价。数字,单位产品价格(以英镑为单位)。

  • CustomerID:客户编号。标称,一个唯一分配给每个客户的 5 位整数。

  • 国家:国名。名义上,每个客户所在国家/地区的名称。

该文件为 Microsoft Excel 格式,因此第一步是将其转换为 .CSV,以便将其作为数据框读入 Spark,然后保存到 Iceberg。转换文件后,我将其保存到存储原始数据的 MinIO 存储桶中。



我正在使用早期教程The Definitive Guide to Lakehouse Architecture with Iceberg and MinIO中的 Iceberg 安装如果您还没有阅读该博文,请参考它或设置您自己的 Iceberg 和 MinIO 环境。

我们将使用 Spark 和 Spark-SQL 与 Iceberg 一起工作。

读入数据并保存为 Iceberg 表

我们要完成的第一个任务是将我们的 .CSV 作为数据帧读入 Spark,然后将其保存到 Iceberg。

仅仅 3 行代码就完成了很多工作,所以让我们先简要介绍一下 Spark 如何处理数据。Spark是一个用于大规模数据处理的统一分析引擎,具有一组 Java、Scala、Python 和 R 中的高级 API。Spark 可以使用 HDFS 和 YARN 的遗留库,或者通过 S3 使用 MinIO 等对象存储运行应用程序接口。Spark-SQL 是一个用于结构化数据处理的 Spark 模块。它使用与 Spark 相同的执行引擎,Spark 和 Spark-SQL 的组合构成了一个强大的分析工具包。

数据框是组织成列的数据集(数据集合)。Dataframes 可以被认为是关系数据库中的表,具有一些针对 Spark 引擎的内置优化。可以从各种来源构建数据框,我们将从 .CSV 构建数据框,将我们的零售数据组织成列和行。

Spark 将读入 .CSV 并将其作为临时视图组织到数据框中。Spark 从 SELECT 语句创建临时视图。这些视图是会话范围的并且是只读的。然后我们将临时视图写成 Iceberg 表,让 Spark 自动为我们创建一个模式。    

val df spark.read.csv("online-retail.csv")
df.createOrReplaceTempView("tempview");
spark.sql("CREATE or REPLACE TABLE retail USING iceberg AS SELECT * FROM tempview");

它不是一个非常大的数据集,因此读取 .CSV 文件和写出 Iceberg Parquet 花费的时间很少。  

SparkSQL 中的快速查询可验证已放入 Iceberg 表中的数据

spark-sql> SELECT * from local.retail limit 10;
22/08/24 18:08:56 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
InvoiceNo       StockCode       Description     Quantity        InvoiceDate     UnitPrice       CustomerID      Country
536365  85123A  WHITE HANGING HEART T-LIGHT HOLDER      6       12/1/2010 8:26  2.55    17850   United Kingdom
536365  71053   WHITE METAL LANTERN     6       12/1/2010 8:26  3.39    17850   United Kingdom
536365  84406B  CREAM CUPID HEARTS COAT HANGER  8       12/1/2010 8:26  2.75    17850   United Kingdom
536365  84029G  KNITTED UNION FLAG HOT WATER BOTTLE     6       12/1/2010 8:26  3.39    17850   United Kingdom
536365  84029E  RED WOOLLY HOTTIE WHITE HEART.  6       12/1/2010 8:26  3.39    17850   United Kingdom
536365  22752   SET 7 BABUSHKA NESTING BOXES    2       12/1/2010 8:26  7.65    17850   United Kingdom
536365  21730   GLASS STAR FROSTED T-LIGHT HOLDER       6       12/1/2010 8:26  4.25    17850   United Kingdom
536366  22633   HAND WARMER UNION JACK  6       12/1/2010 8:28  1.85    17850   United Kingdom
536366  22632   HAND WARMER RED POLKA DOT       6       12/1/2010 8:28  1.85    17850   United Kingdom
Time taken: 7.944 seconds, Fetched 10 row(s)

在MinIO中,我们也可以看到数据文件已经保存到icebergbucket中了



更新 Iceberg 表中的一条记录

我们将使用 SELECT 语句后跟 UPDATE 语句来更新表。更新和删除是在线零售数据集等 OLTP 数据库中的常见操作,而 Iceberg 表格式将此功能引入了数据湖。Iceberg 能够执行行级更新并保持数据一致性。

我们将更新retail表格以更改为特定发票购买的数量。很容易想象这种变化发生在现实世界中,例如,客户可以返回该站点并购买更多商品以包含在原始订单中。我们要为 ( ) 添加一个单位quantityretail._c3) for invoicenoretail._c0) 559340 for stockcoderetail._c1) 22413(金属标志接受或离开)。

一、查询invoice和item

SELECT retail.c0,retail.c1,retail.c3 FROM retail where retail._c0='559340' and retail._c1='22413';

然后使用 UPDATE 语句更新

UPDATE retail SET retail._c3='7' WHERE retail._c0='559340' and retail._c1='22413';

最后,验证更新是否发生

SELECT retail._c0, retail._c1, retail._c2, retail._c3 FROM retail WHERE retail._c0='559340' and retail._c1='22413';

您应该看到以下输出,其中quantityretail._c3) 更改为7

559340  22413   METAL SIGN TAKE IT OR LEAVE IT  7
Time taken: 0.464 seconds, Fetched 1 row(s)

从 Iceberg 表中删除记录

ACID 属性使 Iceberg 数据湖中的删除操作成为可能。  

想象一下,客户要求从我们的零售数据湖中删除他们的用户数据。为了遵守 GDPR,我们必须及时定位并删除这些数据。已收到删除与客户相关的所有记录的请求13269

首先,让我们统计与该客户相关的记录数

SELECT count(*) FROM retail WHERE retail._c6=’13269’; 
320
Time taken: 0.242 seconds, Fetched 1 row(s)

我们将从表中删除这 320 条记录

DELETE FROM retail WHERE retail._c6='13269';

验证记录是否已删除。以下查询应报告 0 条记录:

SELECT count(*) FROM retail WHERE retail._c6='13269';

发展 Iceberg 表的架构

当我们最初将 Spark 数据帧保存为 Iceberg 表时,我们让 Iceberg 自动创建表模式,从而导致一些不太直接的列名,_c0通过_c7我们将把这些更改为人类可读的,例如_c0将重命名为InvoiceNo.

冰山架构更新仅更改元数据。可以添加、删除、重命名、更新和重新排序列,而无需完全重写表,这是一个代价高昂的提议。

让我们看一下当前的架构

SHOW CREATE TABLE retail;
CREATE TABLE iceberg.retail (
_c0 STRING,
_c1 STRING,
_c2 STRING,
_c3 STRING,
_c4 STRING,
_c5 STRING,
_c6 STRING,
_c7 STRING)
USING iceberg
LOCATION 'S3://iceberg/retail'
TBLPROPERTIES (
'current-snapshot-id' = '6565073876818863127',
'format' = 'iceberg/parquet',
'format-version' = '1')
Time taken: 0.082 seconds, Fetched 1 row(s)

让我们将列重命名为人类可读。当您执行每个命令时,请注意 Iceberg 执行元数据更改的速度有多快——我们说的是更改元数据需要几毫秒,而写入整个表需要大约一分钟。

ALTER TABLE retail RENAME COLUMN _c0 TO InvoiceNo;
ALTER TABLE retail RENAME COLUMN _c1 TO StockCode;
ALTER TABLE retail RENAME COLUMN _c2 TO Description;
ALTER TABLE retail RENAME COLUMN _c3 TO Quantity;
ALTER TABLE retail RENAME COLUMN _c4 TO InvoiceDate;
ALTER TABLE retail RENAME COLUMN _c5 TO UnitPrice;
ALTER TABLE retail RENAME COLUMN _c6 TO CustomerID;
ALTER TABLE retail RENAME COLUMN _c7 TO Country;

让我们验证表架构是否已更改

SHOW CREATE TABLE retail;
CREATE TABLE iceberg.retail (
InvoiceNo STRING,
StockCode STRING,
Description STRING,
Quantity STRING,
InvoiceDate STRING,
UnitPrice STRING,
CustomerID STRING,
Country STRING)
USING iceberg
LOCATION 'S3://iceberg/retail'
TBLPROPERTIES (
'current-snapshot-id' = '6565073876818863127',
'format' = 'iceberg/parquet',
'format-version' = '1')
Time taken: 0.034 seconds, Fetched 1 row(s)

用于多云数据湖的 MinIO 和 Iceberg

Iceberg 和 MinIO 是构建企业湖屋的强大技术。两者都是高性能、高度可扩展且可靠的开源组件,大量用户在各种硬件、软件和云实例上运行分析工作负载。  

基于 Iceberg、Delta 和 HUDI 开放表格式构建的 Lakehouses 正在将数据湖分析推向一个新的水平。使用这些开放表格格式、MinIO 和您选择的分析或 ML 包可以构建的内容没有限制。这一切都是开放的,MinIO 是 S3-API 兼容层,将它们联系在一起——并将数据湖扩展到多云,从边缘到数据中心再到公共/私有云。

下载 MinIO并将世界上最快的对象存储用于您的数据湖。通过我们的 Slack 频道分享您的经验或提出问题我们很想听听你在建造什么!


上一篇 下一篇