深入挖掘冰山一角:表上的 ACID 事务
Apache Iceberg 数据湖存储格式在保存到 MinIO 的表上启用 ACID 事务。ACID 事务使多个用户和服务能够同时可靠地以原子方式添加和删除记录。同时,查询被隔离以保持对正在被更改的表的读取一致性。您可以将 MinIO 和 Iceberg 与作为元数据数据库的 PostgreSQL 结合使用,使用 ACID 事务支持进行写入、删除、更新、时间旅行和模式修改。
关于所有这一切的好消息是,您可以继续使用您已经了解和喜爱的 SQL 和 DML。例如,您可以用新信息更新一行或删除一行以将其删除。甚至时间旅行也是一个 SELECT 语句。
将 Iceberg 表格式与 MinIO 相结合,创建了一个强大、灵活且可扩展的 lakehouse 平台。Iceberg表规范声明了一种表格式,旨在管理存储在分布式系统中的“大型、缓慢变化的文件或对象集合”。Iceberg 规范的版本 1 定义了使用不可变文件格式 Parquet、Avro 和 ORC 的大型分析表的管理。规范的第 2 版为具有不可变文件的分析表添加了行级更新和删除。
Iceberg 表格式跟踪表中的单个数据文件而不是目录。数据文件就地创建,文件仅显式添加到表中。表状态在元数据中维护。Iceberg 元数据和清单列表只是存储在 MinIO 中的对象,MinIO 还维护自己的与对象一起存储的元数据。对表状态的每次更改都需要创建一个新的 Iceberg 元数据文件,用原子交换替换旧的元数据。原子性由 MinIO 启用。
Iceberg 表格式要求:
就地写入:文件/对象一旦写入就不会移动或更改
Seekable reads:数据文件格式需要seek支持
删除:表删除不再需要的文件/对象(或者,在 MinIO 的情况下,对象被标记为已删除但保留)
这些要求与 MinIO 等对象存储兼容。一旦写入,数据和元数据文件在被删除之前是不可变的。MinIO 继续保存过时版本的数据和元数据对象,确保数据永远不会被删除,并扩展 Iceberg 的时间旅行功能。
在这篇博文中,我们将深入研究 Iceberg 的 ACID 事务,以创建一个 Iceberg 表,更新和删除其中的记录,并改进其架构。这篇文章是Iceberg 和 MinIO Lakehouse 架构权威指南的后续文章,其中包括对 Lakehouse 架构的解释,对 Spark、Iceberg、PostgreSQL 和 MinIO 一起工作的深入讨论,以及一个教读者如何安装它们的教程、创建表、演进表模式,以及如何使用时间旅行和回滚。
ACID 事务与 Iceberg 和 MinIO 教程
我正在使用UCI 机器学习存储库中托管的在线零售数据集。这是一个交易数据集,其中包含 2010 年 1 月 12 日至 2011 年 9 月 12 日期间发生在英国且已注册的非商店在线零售业务的所有交易。该数据集包含约 550,000 条记录,具有八个属性:
InvoiceNo:发票编号。标称,为每笔交易唯一分配的 6 位整数。如果此代码以字母“c”开头,则表示取消。
StockCode:商品(商品)代码。标称,为每个不同的产品唯一分配的 5 位整数。
描述:产品(项目)名称。名义上的。
数量:每笔交易每个产品(项目)的数量。数字。
InvoiceDate:发票日期和时间。数字,每笔交易产生的日期和时间。
UnitPrice:单价。数字,单位产品价格(以英镑为单位)。
CustomerID:客户编号。标称,一个唯一分配给每个客户的 5 位整数。
国家:国名。名义上,每个客户所在国家/地区的名称。
该文件为 Microsoft Excel 格式,因此第一步是将其转换为 .CSV,以便将其作为数据框读入 Spark,然后保存到 Iceberg。转换文件后,我将其保存到存储原始数据的 MinIO 存储桶中。
我正在使用早期教程The Definitive Guide to Lakehouse Architecture with Iceberg and MinIO中的 Iceberg 安装。如果您还没有阅读该博文,请参考它或设置您自己的 Iceberg 和 MinIO 环境。
我们将使用 Spark 和 Spark-SQL 与 Iceberg 一起工作。
读入数据并保存为 Iceberg 表
我们要完成的第一个任务是将我们的 .CSV 作为数据帧读入 Spark,然后将其保存到 Iceberg。
仅仅 3 行代码就完成了很多工作,所以让我们先简要介绍一下 Spark 如何处理数据。Spark是一个用于大规模数据处理的统一分析引擎,具有一组 Java、Scala、Python 和 R 中的高级 API。Spark 可以使用 HDFS 和 YARN 的遗留库,或者通过 S3 使用 MinIO 等对象存储运行应用程序接口。Spark-SQL 是一个用于结构化数据处理的 Spark 模块。它使用与 Spark 相同的执行引擎,Spark 和 Spark-SQL 的组合构成了一个强大的分析工具包。
数据框是组织成列的数据集(数据集合)。Dataframes 可以被认为是关系数据库中的表,具有一些针对 Spark 引擎的内置优化。可以从各种来源构建数据框,我们将从 .CSV 构建数据框,将我们的零售数据组织成列和行。
Spark 将读入 .CSV 并将其作为临时视图组织到数据框中。Spark 从 SELECT 语句创建临时视图。这些视图是会话范围的并且是只读的。然后我们将临时视图写成 Iceberg 表,让 Spark 自动为我们创建一个模式。
val df spark.read.csv("online-retail.csv")
df.createOrReplaceTempView("tempview");
spark.sql("CREATE or REPLACE TABLE retail USING iceberg AS SELECT * FROM tempview");它不是一个非常大的数据集,因此读取 .CSV 文件和写出 Iceberg Parquet 花费的时间很少。
SparkSQL 中的快速查询可验证已放入 Iceberg 表中的数据
spark-sql> SELECT * from local.retail limit 10; 22/08/24 18:08:56 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException InvoiceNo StockCode Description Quantity InvoiceDate UnitPrice CustomerID Country 536365 85123A WHITE HANGING HEART T-LIGHT HOLDER 6 12/1/2010 8:26 2.55 17850 United Kingdom 536365 71053 WHITE METAL LANTERN 6 12/1/2010 8:26 3.39 17850 United Kingdom 536365 84406B CREAM CUPID HEARTS COAT HANGER 8 12/1/2010 8:26 2.75 17850 United Kingdom 536365 84029G KNITTED UNION FLAG HOT WATER BOTTLE 6 12/1/2010 8:26 3.39 17850 United Kingdom 536365 84029E RED WOOLLY HOTTIE WHITE HEART. 6 12/1/2010 8:26 3.39 17850 United Kingdom 536365 22752 SET 7 BABUSHKA NESTING BOXES 2 12/1/2010 8:26 7.65 17850 United Kingdom 536365 21730 GLASS STAR FROSTED T-LIGHT HOLDER 6 12/1/2010 8:26 4.25 17850 United Kingdom 536366 22633 HAND WARMER UNION JACK 6 12/1/2010 8:28 1.85 17850 United Kingdom 536366 22632 HAND WARMER RED POLKA DOT 6 12/1/2010 8:28 1.85 17850 United Kingdom Time taken: 7.944 seconds, Fetched 10 row(s)
在MinIO中,我们也可以看到数据文件已经保存到icebergbucket中了
更新 Iceberg 表中的一条记录
我们将使用 SELECT 语句后跟 UPDATE 语句来更新表。更新和删除是在线零售数据集等 OLTP 数据库中的常见操作,而 Iceberg 表格式将此功能引入了数据湖。Iceberg 能够执行行级更新并保持数据一致性。
我们将更新retail表格以更改为特定发票购买的数量。很容易想象这种变化发生在现实世界中,例如,客户可以返回该站点并购买更多商品以包含在原始订单中。我们要为 ( ) 添加一个单位quantity( retail._c3) for invoiceno( retail._c0) 559340 for stockcode( retail._c1) 22413(金属标志接受或离开)。
一、查询invoice和item
SELECT retail.c0,retail.c1,retail.c3 FROM retail where retail._c0='559340' and retail._c1='22413';
然后使用 UPDATE 语句更新
UPDATE retail SET retail._c3='7' WHERE retail._c0='559340' and retail._c1='22413';
最后,验证更新是否发生
SELECT retail._c0, retail._c1, retail._c2, retail._c3 FROM retail WHERE retail._c0='559340' and retail._c1='22413';
您应该看到以下输出,其中quantity( retail._c3) 更改为7
559340 22413 METAL SIGN TAKE IT OR LEAVE IT 7 Time taken: 0.464 seconds, Fetched 1 row(s)
从 Iceberg 表中删除记录
ACID 属性使 Iceberg 数据湖中的删除操作成为可能。
想象一下,客户要求从我们的零售数据湖中删除他们的用户数据。为了遵守 GDPR,我们必须及时定位并删除这些数据。已收到删除与客户相关的所有记录的请求13269。
首先,让我们统计与该客户相关的记录数
SELECT count(*) FROM retail WHERE retail._c6=’13269’; 320 Time taken: 0.242 seconds, Fetched 1 row(s)
我们将从表中删除这 320 条记录
DELETE FROM retail WHERE retail._c6='13269';
验证记录是否已删除。以下查询应报告 0 条记录:
SELECT count(*) FROM retail WHERE retail._c6='13269';
发展 Iceberg 表的架构
当我们最初将 Spark 数据帧保存为 Iceberg 表时,我们让 Iceberg 自动创建表模式,从而导致一些不太直接的列名,_c0通过_c7. 我们将把这些更改为人类可读的,例如_c0将重命名为InvoiceNo.
冰山架构更新仅更改元数据。可以添加、删除、重命名、更新和重新排序列,而无需完全重写表,这是一个代价高昂的提议。
让我们看一下当前的架构
SHOW CREATE TABLE retail; CREATE TABLE iceberg.retail ( _c0 STRING, _c1 STRING, _c2 STRING, _c3 STRING, _c4 STRING, _c5 STRING, _c6 STRING, _c7 STRING) USING iceberg LOCATION 'S3://iceberg/retail' TBLPROPERTIES ( 'current-snapshot-id' = '6565073876818863127', 'format' = 'iceberg/parquet', 'format-version' = '1') Time taken: 0.082 seconds, Fetched 1 row(s)
让我们将列重命名为人类可读。当您执行每个命令时,请注意 Iceberg 执行元数据更改的速度有多快——我们说的是更改元数据需要几毫秒,而写入整个表需要大约一分钟。
ALTER TABLE retail RENAME COLUMN _c0 TO InvoiceNo; ALTER TABLE retail RENAME COLUMN _c1 TO StockCode; ALTER TABLE retail RENAME COLUMN _c2 TO Description; ALTER TABLE retail RENAME COLUMN _c3 TO Quantity; ALTER TABLE retail RENAME COLUMN _c4 TO InvoiceDate; ALTER TABLE retail RENAME COLUMN _c5 TO UnitPrice; ALTER TABLE retail RENAME COLUMN _c6 TO CustomerID; ALTER TABLE retail RENAME COLUMN _c7 TO Country;
让我们验证表架构是否已更改
SHOW CREATE TABLE retail; CREATE TABLE iceberg.retail ( InvoiceNo STRING, StockCode STRING, Description STRING, Quantity STRING, InvoiceDate STRING, UnitPrice STRING, CustomerID STRING, Country STRING) USING iceberg LOCATION 'S3://iceberg/retail' TBLPROPERTIES ( 'current-snapshot-id' = '6565073876818863127', 'format' = 'iceberg/parquet', 'format-version' = '1') Time taken: 0.034 seconds, Fetched 1 row(s)
用于多云数据湖的 MinIO 和 Iceberg
Iceberg 和 MinIO 是构建企业湖屋的强大技术。两者都是高性能、高度可扩展且可靠的开源组件,大量用户在各种硬件、软件和云实例上运行分析工作负载。
基于 Iceberg、Delta 和 HUDI 开放表格式构建的 Lakehouses 正在将数据湖分析推向一个新的水平。使用这些开放表格格式、MinIO 和您选择的分析或 ML 包可以构建的内容没有限制。这一切都是开放的,MinIO 是 S3-API 兼容层,将它们联系在一起——并将数据湖扩展到多云,从边缘到数据中心再到公共/私有云。
下载 MinIO并将世界上最快的对象存储用于您的数据湖。通过我们的 Slack 频道分享您的经验或提出问题。我们很想听听你在建造什么!