使用 Iceberg 和 MinIO 的 Lakehouse 架构权威指南

使用 Iceberg 和 MinIO 的 Lakehouse 架构权威指南

Apache Iceberg 似乎席卷了数据世界。最初由 Ryan Blue 在 Netflix 孵化,它最终被传输到它目前所在的 Apache 软件基金会。它的核心是一种用于大规模分析数据集(想想数百 TB 到数百 PB)的开放表格式。

它是一种多引擎兼容格式。这意味着 Spark、Trino、Flink、Presto、Hive 和 Impala 都可以独立并同时在数据集上运行。它支持数据分析的通用语言 SQL,以及全模式演进、隐藏分区、时间旅行、回滚和数据压缩等关键特性。

这篇文章重点介绍 Iceberg 和 MinIO 如何相互补充,以及各种分析框架(Spark、Flink、Trino、Dremio 和 Snowflake)如何利用这两者。

背景

虽然 Apache Hive 在当时是向前迈出的重要一步,但随着分析应用程序变得越来越多、越来越多样化和越来越复杂,它最终开始出现裂痕。为了实现性能,数据需要保留在目录中,并且需要不断管理这些目录。这导致了目录数据库。这解决了数据在哪里的问题,但它引入了该表的状态问题——它现在在两个地方(目录数据库和文件系统)。

这限制了您可以做的事情和存在的灵活性 — 特别是在更改方面,这不能通过一次操作在两个地方都得到保证。

想象一下按日期划分的大量多年数据。年划分为月和周,如果周划分为天,天划分为小时等等——目录列表会爆炸式增长。Hive Metastore (HMS) 是一种事务性 RDBMS。文件系统 (HDFS) 是非事务性的。更改分区信息时,需要重新创建分区存储和文件系统。

这个问题是不可持续的,再多的补丁也无法解决固有问题。事实上,挑战只会随着数据的增长而加速。

现代开放式表格格式的目标

Data Lakehouse 架构的主要卖点之一是它支持多种分析引擎和框架。例如,您需要同时支持 ELT(提取、加载、转换)和 ETL(提取、转换、加载)。您需要支持商业智能、商业分析和 AI/ML 类型的工作负载。您需要以安全且可预测的方式成功地与同一组表进行交互。这意味着 Spark、Flink、Trino、Arrow 和 Dask 等多个引擎都需要以某种方式绑定到一个有凝聚力的架构中。

一个多引擎平台可以有效地存储数据,同时使每个引擎都能成功,这是分析界一直渴望的,也是 Iceberg 和 Data Lakehouse 架构所提供的。

这并不简单,而且有很多挑战;没有简单的方法来使用多个引擎来可靠地更新数据。但即使现在我们有两三种格式可以提供可靠的更新,仍然存在很多混乱,并且在这方面存在问题。


Artboard 6.png


现代要求看起来像这样:

  1. 中央表存储:独立于计算存储数据成为一个关键的架构决策。它之所以重要,是因为数据具有引力,它会将我们拉向数据所在的位置。因此,如果我们的数据完全在一个供应商或云提供商中,那么我们只与该供应商或云提供商绑定。当这些系统是封闭的或专门设计时,这在本质上是有问题的。开放软件成为现代架构的要求。

  2. 便携式计算:另一个现代需求是能够将您的计算引擎带到不同的供应商/云提供商或利用专门的计算引擎。虽然许多人关注重心(数据),但企业还需要逻辑、代码和 SQL 的可移植性。

  3. 访问控制:大多数企业都面临着跨引擎一致授权策略的巨大挑战。然而,这不仅仅是架构,因为跨多个引擎成功且可重复地执行这些策略已成为运营的当务之急。

  4. 维护结构:我们在过去几年中看到的人类工作的最大来源之一是在将数据结构转移到其他地方时丢失数据结构。一个完美的例子曾经是雪花。将数据移动到 Snowflake 的过程是一个手动过程,并且由于文件格式不同以及移动过程中格式的变化,引入第三方数据集也导致返工。

阿帕奇冰山救援

Apache Iceberg 是从头开始设计的,上面提到的大部分挑战和目标是实现开放表格式的基础。它解决了以下挑战:

  1. 灵活计算

    • 不要移动数据;多个引擎应该无缝工作

    • 支持批处理、流式处理和临时作业

    • 支持来自多种语言的代码,而不仅仅是 JVM 框架

  2. SQL 仓库行为

    • 具有 SQL 表的可靠事务,我们能够可靠地执行 CRUD 操作

    • 将关注点与真实表分开提供了隔离

Apache Iceberg 将其记录保存在对象存储中——与 Apache Hive 不同。Iceberg 使 SQL 行为可以被多个引擎利用,它是为大型表设计的。在生产中,单个表可以包含数十 PB 的数据,这非常重要。即使是多 PB 的表也可以从单个节点读取,而不需要分布式 SQL 引擎来筛选表元数据。



pasted image 0 (54).png


来源: https: //iceberg.apache.org/spec/

Iceberg 有一个不成文的规定,在大数据堆栈中使用时是不可见的。这种哲学来自于 SQL 表空间,我们从来没有想过 SQL 表下面是什么。任何从业者都知道,在使用 Hadoop 和类似 Hive 的表时,情况根本就不是这样。

Iceberg 以两种方式保持简单。首先,避免在对表进行更改时出现不愉快的意外。例如,更改永远不应带回已删除和移除的数据。其次,Iceberg 减少了上下文切换,因为桌子底下的内容并不重要——重要的是要完成的工作。

了解冰山文件 IO

FileIO 是核心 Iceberg 库和底层存储之间的接口。FileIO 的创建是为了让 Iceberg 在分布式计算和存储被分解的世界中发挥作用。传统的 Hadoop 生态系统需要分层路径和分区结构,实际上,这些结构与对象存储世界中用于实现速度和规模的方法完全相反。

Hadoop 和 Hive 是高性能和可扩展的云原生对象存储的反模式。依赖 S3 API 与 MinIO 交互的数据湖应用程序可以轻松扩展到每秒处理数百万或数十亿个对象的数千个事务。您可以通过并行处理多个并发请求来提高读写性能。您可以通过向存储桶添加前缀——作为对象名称子集的一串字符,从第一个字符开始——然后编写并行操作,每个前缀打开一个连接来实现这一点。

此外,Hadoop 对文件系统目录的依赖并没有转化为对象存储——当路径不存在时,很难将数据集物理地组织到不同的目录中并通过路径寻址它们。Hadoop 依靠文件系统来定义数据集并为并发和冲突解决提供锁定机制。此外,在 Hadoop 生态系统中,处理重命名操作的作业必须是原子的。这是不可能使用 S3 API 的,因为重命名实际上是两个操作:复制和删除。不幸的是,结果是读写之间没有隔离,可能会导致冲突、冲突和不一致。

相比之下,Iceberg 被设计为使用对象存储完全从物理存储中抽象出来运行。如元数据中所定义,所有位置都是“明确的、不可变的和绝对的”。Iceberg 跟踪表的完整状态,没有引用目录的包袱。使用元数据查找表比使用 S3 API 列出整个层次结构要快得多。没有重命名——提交只是将新条目添加到元数据表中。

FileIO API 在规划和提交阶段执行元数据操作。任务使用 FileIO 来读取和写入底层数据文件,这些文件的位置在提交期间包含在表元数据中。引擎具体如何执行此操作取决于 FileIO 的实现。对于遗留环境,HadoopFileIO充当现有 Hadoop 文件系统实现和 Iceberg 中的 FileIO API 之间的适配器层。

我们将重点放在S3FileIO因为它是本机 S3 实现。当我们构建我们的云原生 Lakehouse 时,我们不需要随身携带 Hadoop cruft。根据Iceberg FileIO:Cloud Native Tables,原生 S3 实现的优势包括:

  • 契约行为: Hadoop 文件系统实现具有严格的契约行为,导致额外的请求(存在性检查、消除冲突的目录和路径),从而增加开销和复杂性。Iceberg 使用完全可寻址和唯一的路径,避免了额外的复杂性。

  • 优化上传: S3FileIO通过逐步上传数据来优化存储/内存,以最大限度地减少大型任务的磁盘消耗,并在打开多个文件进行输出时保持低内存消耗。

  • S3 客户端定制:客户端使用最新的主要 AWS SDK 版本 (v2),并允许用户完全自定义客户端以用于 S3(包括任何 S3 API 兼容端点)。

  • 序列化性能:任务处理HadoopFileIO需要 Hadoop 配置的序列化,这非常大,在退化的情况下会减慢处理速度并导致比处理的数据更多的开销。

  • 减少依赖性: Hadoop 文件系统实现引入了一个大的依赖树,简化的实现降低了整体打包的复杂性。

Iceberg 通过iceberg-aws模块提供与不同 AWS 服务的集成,从0.11.0以后的所有版本都与 Spark 和 Flink 运行时捆绑在一起。Iceberg 允许用户通过 向 S3 写入数据S3FileIO使用 时S3FileIO,目录被配置为使用目录属性使用 S3 API io-implS3FileIO采用最新的 S3 功能来优化安全性(S3 访问控制列表、所有三种 S3 服务器端加密模式)和性能(渐进式分段上传),因此推荐用于对象存储用例。

Iceberg 和 MinIO 教程

目前,Spark 是与 Iceberg 一起使用的功能最丰富的计算引擎,因此本教程重点介绍使用 Spark 和 Spark-SQL 来了解 Iceberg 的概念和功能。在 Ubuntu 20.04 上,我们将安装和配置 Java、作为目录或元数据指针的 PostgreSQL、Spark 和 MinIO——同时仔细下载和配置 Java 依赖项。然后我们将运行 Spark-SQL 来创建、填充、查询和修改表。我们还将介绍您可以使用 Iceberg 完成的一些很棒的事情,例如模式演变、使用隐藏分区、时间旅行和回滚。在每一步之后,我们都会在 MinIO 中包含 Iceberg 桶的屏幕截图,以便您了解幕后发生的事情。

先决条件

下载并启动MinIO 服务器。记录 IP 地址、TCP 端口、访问密钥和秘密密钥。
下载并安装MinIO 客户端。

使用 MinIO Client 为 Iceberg 设置别名并创建一个 bucket

mc alias set minio http://<your-MinIO-IP:port> <your-MinIO-access-key>  <your-MinIO-secret-key>
mc mb minio/iceberg
Bucket created successfully `myminio/iceberg`.

您将需要下载并配置 Spark 以使用所需的 Java 存档 (JAR),以便启用各种功能,例如 Hadoop、AWS S3 和 JDBC。您还需要在 PATH 和 CLASSPATH 中拥有每个必需的 JAR 和配置文件的正确版本。不幸的是,调用不同版本的 JAR 很容易忘记您正在运行的是哪个 JAR,因此会遇到停止显示的不兼容性。

如果您还没有安装 Java Runtime,请安装它。对于 Ubuntu 20.04,命令是

sudo apt install curl mlocate default-jdk -y

下载并配置 PostgreSQL 作为系统服务运行

sudo sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list'
wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add -
sudo apt-get update
sudo apt-get -y install postgresql
sudo systemctl start postgresql.service

我们将创建一个icebergcat超级用户角色,设置密码并创建一个数据库icebergcat

sudo -u postgres createuser --interactive
ALTER ROLE icebergcat PASSWORD 'minio';
sudo -u postgres createdb icebergcat

登录数据库以验证其是否正常工作,系统将提示您输入密码:

psql -U icebergcat -d icebergcat -W -h 127.0.0.1

下载、提取和移动 Apache Spark

$ wget https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
$ tar zxvf spark-3.2.1-bin-hadoop3.2.tgz
$ sudo mv spark-3.2.1-bin-hadoop3.2/ /opt/spark

通过添加以下内容来设置 Spark 环境~/.bashrc,然后重新启动 shell 以应用更改

export SPARK_HOME=/opt/spark
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
bash -l

需要以下 .jar 文件。下载并复制 .jar 文件到 Spark 机器上的任何所需位置,例如/opt/spark/jars.

需要aws-java-sdk-bundle/1.11.901.jar (或更高版本)来支持 S3 协议。

$ wget https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.17.230/bundle-2.17.230.jar

iceberg-spark-runtime-3.2_2.12.jar是必需的。

$ wget https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.1_2.12/0.13.2/iceberg-spark-runtime-3.1_2.12-0.13.2.jar

启动星火

启动 Spark 独立主服务器

$ start-master.sh
starting org.apache.spark.deploy.master.Master, logging to /opt/spark/logs/spark-msarrel-org.apache.spark.deploy.master.Master-1-<Your-Machine-Name>.out

打开浏览器并转到 http: //


Untitled (44).png


Spark 在 spark://:7077 处于活动状态

启动一个 Spark 工作进程

$ /opt/spark/sbin/start-worker.sh spark://<Your-Machine-Name>:7077
starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark/logs/spark-msarrel-org.apache.spark.deploy.worker.Worker-1-<Your-Machine-Name>.out

Spark-SQL 和 Iceberg

在启动 Spark-SQL 之前初始化环境。

export AWS_ACCESS_KEY_ID=minioadmin
export AWS_SECRET_ACCESS_KEY=minioadmin
export AWS_S3_ENDPOINT=10.0.0.10:9000
export AWS_REGION=us-east-1
export MINIO_REGION=us-east-1
export DEPENDENCIES="org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.2"
export AWS_SDK_VERSION=2.17.230
export AWS_MAVEN_GROUP=software.amazon.awssdk
export AWS_PACKAGES=(
"bundle"
"url-connection-client"
)
for pkg in "${AWS_PACKAGES[@]}"; do
export DEPENDENCIES+=",$AWS_MAVEN_GROUP:$pkg:$AWS_SDK_VERSION"
done

运行以下命令以使用 PostgreSQL 启动带有 Iceberg 的 Spark-SQL,以获取 MinIO 所需的元数据和对 S3 API 的支持。或者,您可以使用本地spark-defaults.conf文件设置配置

$ spark-sql --packages $DEPENDENCIES \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog \
--conf spark.sql.catalog.my_catalog.uri=jdbc:postgresql://127.0.0.1:5432/icebergcat \
--conf spark.sql.catalog.my_catalog.jdbc.user=icebergcat \
--conf spark.sql.catalog.my_catalog.jdbc.password=minio \
--conf spark.sql.catalog.my_catalog.warehouse=s3://iceberg \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.s3.endpoint=http://10.0.0.10:9000 \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.defaultCatalog=my_catalog \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=/home/iceicedata/spark-events \
--conf spark.history.fs.logDirectory= /home/iceicedata/spark-events \
--conf spark.sql.catalogImplementation=in-memory

关于此配置的一些重要说明

  • my_catalog我们声明一个使用 JDBC 连接到内部 IP 地址上的 PostgreSQL 的目录,并将该icebergcat表用于元数据。

  • 然后,我们将仓库位置设置为我们之前创建的 MinIO 存储桶,并配置 Iceberg 以用于S3FileIO访问它。

创建表

接下来,我们将创建一个简单的表。

CREATE TABLE my_catalog.my_table (
id bigint,
data string,
category string)
USING iceberg
LOCATION 's3://iceberg'
PARTITIONED BY (category);

这是 Iceberg 通过 S3FileIO 提供的巨大性能改进。对于我们这些在使用 S3 的传统 Hive 存储布局时由于基于对象前缀限制请求而遭受性能低下的人来说,这是一种极大的解脱。众所周知,在 AWS S3 上创建分区的 Athena/Hive 表可能需要 30-60 分钟。Iceberg 默认使用 Hive 存储布局,但可以切换为使用ObjectStoreLocationProvider使用ObjectStoreLocationProvider,为每个存储的文件生成确定性哈希,哈希直接附加在write.data.path这可确保写入 S3 兼容对象存储的文件均匀分布在 S3 存储桶中的多个前缀中,从而为与 S3 相关的 IO 操作带来最小的限制和最大的吞吐量。使用时ObjectStoreLocationProvider,在你的 Iceberg 表之间共享和短路write.data.path将提高性能。在 Iceberg 中做了很多工作来提高 Hive 的性能和可靠性

CREATE TABLE my_catalog.my_table (
    id bigint,
    data string,
    category string)
USING iceberg
OPTIONS (
    'write.object-storage.enabled'=true, 
    'write.data.path'='s3://iceberg')
PARTITIONED BY (category);

查看 MinIO 控制台,我们看到在我们的iceberg存储桶下创建了一个路径my_table


Untitled (45).png


存储桶包含metadata路径


Untitled (46).png


此时,表中没有数据,只有描述表的元数据。还有一个指向此元数据的指针存储在 PostgreSQL 的 Iceberg 目录表中。my_catalogSpark-SQL(查询引擎)按表名 ( )搜索 Iceberg 目录 ( my_table),并检索当前元数据文件的 URI。


Untitled (47).png


让我们看一下第一个元数据文件,其中存储了有关表的架构、分区和快照的信息。定义所有快照后,会current-snapshot-id告诉查询引擎要使用哪个快照,然后查询引擎会在数组中搜索该值snapshots,获取该快照的值manifest-list并按顺序打开该列表中的清单文件。请注意,我们的示例只有一个快照,因为表刚刚创建,没有清单,因为我们还没有插入数据。

{
  "format-version" : 1,
  "table-uuid" : "b72c46d1-0648-4e02-aab3-0d2853c97363",
  "location" : "s3://iceberg/my_table",
  "last-updated-ms" : 1658795119167,
  "last-column-id" : 3,
  "schema" : {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "id",
      "required" : false,
      "type" : "long"
    }, {
      "id" : 2,
      "name" : "data",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 3,
      "name" : "category",
      "required" : false,
      "type" : "string"
    } ]
  },
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "id",
      "required" : false,
      "type" : "long"
    }, {
      "id" : 2,
      "name" : "data",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 3,
      "name" : "category",
      "required" : false,
      "type" : "string"
    } ]
  } ],
  "partition-spec" : [ {
    "name" : "category",
    "transform" : "identity",
    "source-id" : 3,
    "field-id" : 1000
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ {
      "name" : "category",
      "transform" : "identity",
      "source-id" : 3,
      "field-id" : 1000
    } ]
  } ],
  "last-partition-id" : 1000,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "option.write.data.path" : "s3://iceberg/my_table",
    "owner" : "msarrel",
    "option.write.object-storage.enabled" : "true",
    "write.data.path" : "s3://iceberg/my_table",
    "write.object-storage.enabled" : "true"
  },

  "current-snapshot-id" : -1,
  "snapshots" : [ ],
  "snapshot-log" : [ ],
  "metadata-log" : [ ]
}

接下来,让我们插入一些模拟数据并观察 Iceberg 存储在 MinIO 中的文件。在桶内iceberg,有 nowmy_table/metadatamy_table/data前缀。

INSERT INTO my_catalog.my_table VALUES (1, 'a', "music"), (2, 'b', "music"), (3, 'c', "video");


Untitled (48).png


元数据前缀包含原始元数据文件、清单列表和清单文件。清单列表是——您猜对了——清单文件的列表。清单列表包含有关每个快照中包含的每个清单文件的信息:清单文件的位置、添加快照的结果、有关分区的信息以及相关数据文件的分区列的下限和上限。在查询期间,查询引擎从清单列表中读取清单文件位置的值并打开适当的清单文件。清单列表采用 AVRO 格式。

清单文件跟踪数据文件并包括有关每个文件的详细信息和预先计算的统计信息。首先要跟踪的是文件格式和位置。清单文件是 Iceberg 通过文件系统位置消除 Hive 样式跟踪数据的方式。清单文件通过包含分区成员资格、记录计数以及每列的下限和上限等详细信息来提高读取数据文件的效率和性能。统计信息是在写入操作期间写入的,并且比 Hive 统计信息更及时、准确和最新。


Untitled (49).png


提交 SELECT 查询时,查询引擎会从元数据库中获取清单列表的位置。file-path然后查询引擎读取每个对象的条目data-file,然后打开数据文件以执行查询。

下面显示的是前缀的内容data,按分区组织。


Untitled (50).png


在分区内,每个表行有一个数据文件。


Untitled (51).png


让我们运行一个示例查询

spark-sql> SELECT count(1) as count, data
FROM my_catalog.my_table
GROUP BY data;
1       a
1       b
1       c
Time taken: 9.715 seconds, Fetched 3 row(s)
spark-sql>

现在我们了解了 Iceberg 表的不同组件以及查询引擎如何与它们一起工作,让我们深入了解 Iceberg 的最佳功能以及如何在您的数据湖中利用它们。

表演变

添加、删除、重命名和更新等架构演变更改是元数据更改,这意味着无需更改/重写数据文件即可执行更新。Iceberg 还保证这些模式演进变化是独立的,没有副作用。Iceberg 使用唯一 ID 来跟踪表中的每一列,如果添加新列,它永远不会错误地利用现有 ID。

Iceberg 表分区可以在现有表中更新,因为查询不直接引用分区值。写入新数据时,它会在新布局中使用新规格,之前写入的具有不同规格的数据保持不变。当您编写新查询时,这会导致拆分计划。为了提高性能,Iceberg 使用隐藏分区,因此用户无需为特定分区布局编写查询即可提高速度。用户专注于为他们需要的数据编写查询,并让 Iceberg 修剪不包含匹配数据的文件。

另一个非常有用的演变是 Iceberg 排序顺序也可以在现有表中更新,就像分区规范一样。不同的引擎可以选择在未排序的顺序上以最新的排序顺序写入数据,当排序非常昂贵时,以前的排序顺序写入的旧数据保持不变。

spark-sql> ALTER TABLE my_catalog.my_table
> RENAME my_catalog.my_table_2;

执行此操作的前几次,您会被它的速度所震撼。这是因为您不是在重写表,您只是在对元数据进行操作。在这种情况下,我们只是进行了更改table_name,而 Iceberg 在大约十分之一秒内为我们完成了此操作。


Untitled (52).png


其他模式更改同样轻松。

spark-sql> ALTER TABLE my_catalog.my_table RENAME COLUMN data TO quantity;
spark-sql> ALTER TABLE my_catalog.my_table ADD COLUMN buyer string AFTER quantity;
spark-sql> ALTER TABLE my_catalog.my_table ALTER COLUMN quantity AFTER buyer;

分区

正如我们之前提到的,其他 Hive 格式支持分区,但是 Iceberg 支持隐藏分区,可以处理为表中的行生成分区值的繁琐且容易出错的任务。用户专注于为解决业务问题的查询添加过滤器,而不用担心表是如何分区的。Iceberg 负责自动避免读取不必要的分区。

Iceberg 为您处理复杂的分区和更改表的分区方案,大大简化了最终用户的流程。您可以定义分区或让 Iceberg 为您处理。Iceberg 喜欢根据时间戳进行分区,例如事件时间。分区由清单中的快照跟踪。查询不再依赖于表的物理布局。由于物理表和逻辑表之间的这种分离,随着更多数据的添加,Iceberg 表可以随着时间的推移演变分区。例如,重新分区 Hive 表需要创建一个新表并将旧数据读入其中。您还必须更改已编写的每个查询中的 PARTITION 值——这并不有趣。

spark-sql> ALTER TABLE my_catalog.my_table ADD COLUMN month int AFTER category;
ALTER TABLE my_catalog.my_table ADD PARTITION FIELD month;

我们现在对同一个表有两个分区方案。在 Hive 中不可能发生的事情在 Iceberg 中透明地发生了。从现在开始,查询计划被拆分,使用旧分区方案查询旧数据,使用新分区方案查询新数据。Iceberg 会为您解决这个问题——查询表的人不需要知道数据是使用两个分区方案存储的。Iceberg 通过幕后 WHERE 子句和分区过滤器的组合来实现这一点,分区过滤器会删除没有匹配项的数据文件。

时间旅行和回滚

每次写入 Iceberg 表都会创建新的快照。快照就像版本一样,可以像我们使用 MinIO 版本控制功能一样用于时间旅行和回滚。管理快照的方式是通过设置expireSnapshot来维护系统。时间旅行支持使用完全相同的表快照的可重现查询,或者让用户轻松检查更改。版本回滚允许用户通过将表重置为良好状态来快速更正问题。

随着表的更改,Iceberg 将每个版本作为快照进行跟踪,然后在查询表时提供时间旅行到任何快照的能力。如果您想运行历史查询或重现以前查询的结果(可能用于报告),这将非常有用。时间旅行在测试新代码更改时也很有用,因为您可以通过查询已知结果来测试新代码。

查看已为表保存的快照

spark-sql> SELECT * FROM my_catalog.my_table.snapshots;
2022-07-25 17:26:47.53  527713811620162549      NULL    append  s3://iceberg/my_table/metadata/snap-527713811620162549-1-c16452b4-b384-42bc-af07-b2731299e2b8.avro  {"added-data-files":"3","added-files-size":"2706","added-records":"3","changed-partition-count":"2","spark.app.id":"local-1658795082601","total-data-files":"3","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2706","total-position-deletes":"0","total-records":"3"}
Time taken: 7.236 seconds, Fetched 1 row(s)

一些例子:

-- time travel to October 26, 1986 at 01:21:00
spark-sql> SELECT * FROM my_catalog.my_table TIMESTAMP AS OF '1986-10-26 01:21:00';
-- time travel to snapshot with id 10963874102873
spark-sql> SELECT * FROM prod.db.table VERSION AS OF 10963874102873;

您可以使用快照进行增量读取,但您必须使用 Spark,而不是 Spark-SQL。例如

scala> spark.read()
.format(“iceberg”)
.option(“start-snapshot-id”, “10963874102873”)
.option(“end-snapshot-id”, “10963874102994”)
.load(“s3://iceberg/my_table”)

您还可以将表回滚到某个时间点或特定快照,如这两个示例

spark-sql> CALL my_catalog.system.rollback_to_timestamp(‘my_table’, TIMESTAMP ‘2022-07-25 12:15:00.000’); 
spark-sql> CALL my_catalog.system.rollback_to_snapshot(‘my_table’, 527713811620162549);

富有表现力的SQL

Iceberg 支持行级删除、合并和更新等所有富有表现力的 SQL 命令,最大的亮点是 Iceberg 支持 Eager 和 lazy 策略。我们可以对所有需要删除的东西进行编码(例如,GDPR 或 CCPA),但不会立即重写所有这些数据文件,我们可以根据需要懒惰地收集垃圾,这确实有助于提高 Iceberg 支持的巨大表的效率。

例如,您可以删除表中与特定谓词匹配的所有记录。以下将从视频类别中删除所有行。

spark-sql> DELETE FROM my_catalog.my_table WHERE category = ‘video’;

或者,您可以使用 CREATE TABLE AS SELECT 或 REPLACE TABLE AS SELECT 来完成此操作

spark-sql> CREATE TABLE my_catalog.my_table_music AS SELECT * FROM my_catalog.my_table WHERE category = ‘music’;

您可以很容易地合并两个表

spark-sql> MERGE INTO my_catalog.my_data pt USING (SELECT * FROM my_catalog.my_data_new) st ON pt.id = st.id WHEN NOT MATCHED THEN INSERT *;

数据工程

Iceberg 是开放式分析表标准的基础,它使用 SQL 行为和与其他 Hive 表格式不同的真实表抽象,并应用数据仓库基础知识在我们知道问题之前解决问题。通过声明式数据工程,我们可以配置表格,而不必担心更改每个引擎以适应数据的需求。这将解锁自动优化和建议。通过安全提交,数据服务成为可能,这有助于避免人类照看数据工作负载。

为了检查表的历史、快照和其他元数据,Iceberg 支持查询元数据。元数据表通过在查询中的原始表名后添加元数据表名(例如,history)来标识。

显示表的数据文件

spark-sql> SELECT * FROM my_catalog.my_table.files;

显示清单

spark-sql> SELECT * FROM my_catalog.my_table.manifests;

显示表历史

spark-sql> SELECT * FROM my_catalog.my_table.history;

显示快照

spark-sql> SELECT * FROM my_catalog.my_table.snapshots;

您还可以加入快照和表历史记录以查看编写每个快照的应用程序

spark-sql> select
              h.made_current_at,
              s.operation,
              h.snapshot_id,
              h.is_current_ancestor,
              s.summary['spark.app.id']
           from my_catalog.my_table.history h
           join my_catalog.my_table.snapshots s
              on h.snapshot_id = s.snapshot_id
           order by made_current_at;

现在您已经了解了基础知识,将一些数据加载到 Iceberg 中,然后从Spark and Iceberg QuickstartIceberg Documentation中了解更多信息。

集成

Apache Iceberg 与各种查询和执行引擎集成,Apache Iceberg 表可以由这些连接器创建和管理。支持 Iceberg 的引擎有SparkFlinkHivePrestoTrinoDremioSnowflake

用 Iceberg 和 MinIO 构建数据湖很酷

Apache Iceberg 作为数据湖的表格格式而备受关注。不断壮大的开源社区以及越来越多的来自多个云提供商和应用程序框架的集成意味着是时候认真对待 Iceberg,开始试验、学习和计划将其集成到现有的数据湖架构中了。将 Iceberg 与 MinIO 配对用于多云数据湖和分析。




上一篇 下一篇