使用 Spark 管理冰山表

使用 Spark 管理冰山表

Apache Iceberg 是一种开放的表格式,它与多引擎兼容,专为容纳大规模分析数据集而构建。多引擎意味着 Spark、Trino、Presto、Hive 和 Impala 都可以同时独立地对相同数据进行操作。之前,我们发布了使用 Iceberg 和 MinIO 的 Lakehouse 架构权威指南作为该主题的介绍,包括设计目标和主要功能,以及深入研究 Iceberg:表上的 ACID 事务讨论了如何使用 Iceberg 格式的数据。  

在这篇博文中,我们将构建一个 Notebook,它使用 MinIO 作为 Spark 作业的对象存储来管理 Iceberg 表。如果您尚未在 Kubernetes 环境中设置 spark-operator,请参阅Spark、MinIO 和 Kubernetes

阿帕奇冰山

Apache Iceberg是一种开源表格格式,可以在 Amazon S3、Azure Blob Storage、Google Cloud Storage 和 MinIO 等云存储系统中高效存储大型、变化缓慢的数据集。Iceberg 最初由 Netflix 开发,解决了 Apache Parquet 和 Apache ORC 等现有表格格式的一些局限性。

Iceberg 旨在提供许多优于传统表格格式的优势,以及 Data Lake 的 goto 表格格式,以下是其中的一些优势:

  • 模式演变:数据湖的特点通常是其灵活性和存储各种数据格式的能力。但是,这种灵活性会使管理模式随时间发生的变化变得具有挑战性。Iceberg 提供了一种添加、删除或修改表列的方法,而无需完全重写数据,从而更容易随时间演变模式

  • 事务性写入:在数据湖中,确保数据准确和一致非常重要,尤其是当数据用于关键业务目的时。Iceberg 为写入操作提供了 ACID 事务的支持,确保数据始终处于一致的状态

  • 查询隔离:数据湖经常被许多用户或应用程序同时使用。Iceberg 允许多个查询同时运行而不会相互干扰,从而可以在不牺牲性能的情况下扩展数据湖的使用

  • 时间旅行:在数据湖中,能够查询在特定时间点出现的数据通常很有用。Iceberg 提供了一个时间旅行 API,使用户能够查询在特定版本或时间戳上存在的数据,从而更容易分析历史趋势或跟踪随时间的变化

  • 分区修剪:数据湖通常包含大量数据,这会使查询速度变慢并占用大量资源。Iceberg 支持按一列或多列对数据进行分区,这可以通过减少查询时需要读取的数据量来显着提高查询性能。

Iceberg 可以与各种处理引擎和框架一起使用,包括 Apache Spark、Dremio 和 Presto。它还与Apache Arrow集成,这是一种跨语言的内存数据格式,可以跨不同的处理引擎实现高效的数据序列化和反序列化。

冰山目录

Apache Iceberg 目录是一个元数据存储,其中包含有关表的信息,包括它们的架构、位置和分区方案。它负责管理表的生命周期,包括创建、更新和删除表,并提供用于查询元数据和访问数据的 API。

以下是 Apache Iceberg 支持的一些目录:

  • JDBC目录

  • 蜂巢目录

  • 尼斯湖水怪目录

  • Hadoop目录

  • 胶水目录

  • DynamoDB 目录

  • REST目录

为了使本次演练简单易行,我们将为 Iceberg 表使用 Hadoop Catalog。

MinIO 对象存储

MinIO 保证 Iceberg 表的持久性和这些表上的 Spark 操作的高性能。MinIO 使用加密保护 Iceberg 表,并根据基于策略的访问控制限制对它们的访问。MinIO使用 AES-256-GCM、ChaCha20-Poly1305 和 AES-CBC 等现代行业标准加密算法,使用 TLS 加密传输中的数据和使用粒度对象级加密的驱动器上的数据MinIO 集成了外部身份提供者,例如 ActiveDirectory/LDAP、Okta 和 IAM 的 Keycloak。然后,用户和组在尝试访问 Iceberg 表时受制于与 AWS IAM 兼容的 PBAC。

Iceberg 表在写入 MinIO 存储桶后受到保护:

  • 擦除编码将数据文件拆分为数据和奇偶校验块并对其进行编码,以便即使部分编码数据不可用,也可以恢复主要数据。水平可扩展的分布式存储系统跨多个驱动器和节点保存编码数据。如果驱动器或节点发生故障或数据损坏,则可以从保存在其他驱动器和节点上的奇偶校验和数据块重建原始数据。

  • Bit Rot Protection在后台捕获并修复损坏的对象,以消除这种对数据持久性的隐藏威胁

  • Bucket 和 Object Immutability使用对象锁定、保留和其他治理机制保护保存到 MinIO 的数据不被删除或修改。写入 MinIO 的对象永远不会被覆盖。

  • 桶和对象版本控制进一步保护对象。MinIO 存储每个对象的每个版本,即使它被删除,除非一个版本被特别删除。MinIO 的数据生命周期管理  允许管理员在层之间移动存储桶,例如将 NVMe 用于性能密集型工作负载,并为版本设置到期日期,以便将它们从系统中清除以提高存储效率。

将演示数据导入 MinIO

我们将使用 MinIO 上可用的 NYC Taxi 数据集。你可以从这里下载数据集,它有大约 112M 行,大小大约为 10GB。您可以使用您选择的任何其他数据集,并使用以下命令将其上传到 MinIO:

!mc mb play/openlake
!mc mb play/openlake/spark
!mc mb play/openlake/spark/sample-data
!mc cp nyc-taxi-data.csv play/openlake/spark/sample-data/nyc-taxi-data.csv

管理 Iceberg 表的示例 PySpark 应用程序

这是基于Iceberg Notebook 入门

%%writefile sample-code/src/main-iceberg.py
import logging
import os
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("MinIOSparkJob")


# adding iceberg configs
conf = (
    SparkConf()
    .set("spark.sql.extensions",
        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") # Use Iceberg with Spark
    .set("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.sql.catalog.demo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .set("spark.sql.catalog.demo.warehouse", "s3a://openlake/warehouse/")
    .set("spark.sql.catalog.demo.s3.endpoint", "https://play.min.io:50000")
    .set("spark.sql.defaultCatalog", "demo") # Name of the Iceberg catalog
    .set("spark.sql.catalogImplementation", "in-memory")
    .set("spark.sql.catalog.demo.type", "hadoop") # Iceberg catalog type
    .set("spark.executor.heartbeatInterval", "300000")
    .set("spark.network.timeout", "400000")
)

spark = SparkSession.builder.config(conf=conf).getOrCreate()

# Disable below line to see INFO logs
spark.sparkContext.setLogLevel("ERROR")


def load_config(spark_context: SparkContext):
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID", "openlakeuser"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
                                                os.getenv("AWS_SECRET_ACCESS_KEY", "openlakeuser"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", os.getenv("ENDPOINT", "play.min.io:50000"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "true")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "1")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")


load_config(spark.sparkContext)

# Define schema for NYC Taxi Data
schema = StructType([
    StructField('VendorID', LongType(), True),
    StructField('tpep_pickup_datetime', StringType(), True),
    StructField('tpep_dropoff_datetime', StringType(), True),
    StructField('passenger_count', DoubleType(), True),
    StructField('trip_distance', DoubleType(), True),
    StructField('RatecodeID', DoubleType(), True),
    StructField('store_and_fwd_flag', StringType(), True),
    StructField('PULocationID', LongType(), True),
    StructField('DOLocationID', LongType(), True),
    StructField('payment_type', LongType(), True),
    StructField('fare_amount', DoubleType(), True),
    StructField('extra', DoubleType(), True),
    StructField('mta_tax', DoubleType(), True),
    StructField('tip_amount', DoubleType(), True),
    StructField('tolls_amount', DoubleType(), True),
    StructField('improvement_surcharge', DoubleType(), True),
    StructField('total_amount', DoubleType(), True)])

# Read CSV file from MinIO
df = spark.read.option("header", "true").schema(schema).csv(
    os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))

# Create Iceberg table "nyc.taxis_large" from RDD
df.write.mode("overwrite").saveAsTable("nyc.taxis_large")

# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data: {total_rows_count}")

# Rename column "fare_amount" in nyc.taxis_large to "fare"
spark.sql("ALTER TABLE nyc.taxis_large RENAME COLUMN fare_amount TO fare")

# Rename column "trip_distance" in nyc.taxis_large to "distance"
spark.sql("ALTER TABLE nyc.taxis_large RENAME COLUMN trip_distance TO distance")

# Add description to the new column "distance"
spark.sql(
    "ALTER TABLE nyc.taxis_large ALTER COLUMN distance COMMENT 'The elapsed trip distance in miles reported by the taximeter.'")

# Move "distance" next to "fare" column
spark.sql("ALTER TABLE nyc.taxis_large ALTER COLUMN distance AFTER fare")

# Add new column "fare_per_distance" of type float
spark.sql("ALTER TABLE nyc.taxis_large ADD COLUMN fare_per_distance FLOAT AFTER distance")

# Check the snapshots available
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show()  # prints all the available snapshots (1 till now)

# Populate the new column "fare_per_distance"
logger.info("Populating fare_per_distance column...")
spark.sql("UPDATE nyc.taxis_large SET fare_per_distance = fare/distance")

# Check the snapshots available
logger.info("Checking snapshots...")
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show()  # prints all the available snapshots (2 now) since previous operation will create a new snapshot

# Qurey the table to see the results
res_df = spark.sql("""SELECT VendorID
                            ,tpep_pickup_datetime
                            ,tpep_dropoff_datetime
                            ,fare
                            ,distance
                            ,fare_per_distance
                            FROM nyc.taxis_large LIMIT 15""")
res_df.show()

# Delete rows from "fare_per_distance" based on criteria
logger.info("Deleting rows from fare_per_distance column...")
spark.sql("DELETE FROM nyc.taxis_large WHERE fare_per_distance > 4.0 OR distance > 2.0")
spark.sql("DELETE FROM nyc.taxis_large WHERE fare_per_distance IS NULL")

# Check the snapshots available
logger.info("Checking snapshots...")
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show()  # prints all the available snapshots (4 now) since previous operations will create 2 new snapshots

# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data after delete operations: {total_rows_count}")

# Partition table based on "VendorID" column
logger.info("Partitioning table based on VendorID column...")
spark.sql("ALTER TABLE nyc.taxis_large ADD PARTITION FIELD VendorID")

# Query Metadata tables like snapshot, files, history
logger.info("Querying Snapshot table...")
snapshots_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots ORDER BY committed_at")
snapshots_df.show()  # shows all the snapshots in ascending order of committed_at column

logger.info("Querying Files table...")
files_count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large.files")
total_files_count = files_count_df.first().cnt
logger.info(f"Total Data Files for NYC Taxi Data: {total_files_count}")

spark.sql("""SELECT file_path,
                    file_format,
                    record_count,
                    null_value_counts,
                    lower_bounds,
                    upper_bounds
                    FROM nyc.taxis_large.files LIMIT 1""").show()

# Query history table
logger.info("Querying History table...")
hist_df = spark.sql("SELECT * FROM nyc.taxis_large.history")
hist_df.show()

# Time travel to initial snapshot
logger.info("Time Travel to initial snapshot...")
snap_df = spark.sql("SELECT snapshot_id FROM nyc.taxis_large.history LIMIT 1")
spark.sql(f"CALL demo.system.rollback_to_snapshot('nyc.taxis_large', {snap_df.first().snapshot_id})")

# Qurey the table to see the results
res_df = spark.sql("""SELECT VendorID
                            ,tpep_pickup_datetime
                            ,tpep_dropoff_datetime
                            ,fare
                            ,distance
                            ,fare_per_distance
                            FROM nyc.taxis_large LIMIT 15""")
res_df.show()

# Query history table
logger.info("Querying History table...")
hist_df = spark.sql("SELECT * FROM nyc.taxis_large.history")
hist_df.show()  # 1 new row

# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data after time travel: {total_rows_count}")


构建 Docker 镜像

我们现在将构建包含上述 Python 应用程序的 Docker 映像。您可以使用以下 Dockerfile 构建映像:


%%writefile sample-code/Dockerfile
FROM openlake/spark-py:3.3.2

USER root

WORKDIR /app

RUN pip3 install pyspark==3.3.2

COPY src/*.py .


您可以构建自己的 Docker 映像或使用 Docker Hub 上提供的预构建映像 openlake/sparkjob-demo:3.3.2。

部署 Spark Iceberg 应用程序

我们将构建 Spark 作业 YAML 来定义规范,然后将其部署到 Kubernetes 集群中

%%writefile sample-code/spark-job/sparkjob-iceberg.yml
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
    name: spark-iceberg
    namespace: spark-operator
spec:
    type: Python
    pythonVersion: "3"
    mode: cluster
    image: "openlake/sparkjob-demo:3.3.2"
    imagePullPolicy: Always
    mainApplicationFile: local:///app/main-iceberg.py
    sparkVersion: "3.3.2"
    restartPolicy:
        type: OnFailure
        onFailureRetries: 3
        onFailureRetryInterval: 10
        onSubmissionFailureRetries: 5
        onSubmissionFailureRetryInterval: 20
    driver:
        cores: 1
        memory: "1024m"
        labels:
            version: 3.3.2
        serviceAccount: my-release-spark
        env:
            -   name: AWS_REGION
                value: us-east-1
            -   name: AWS_ACCESS_KEY_ID
                value: openlakeuser
            -   name: AWS_SECRET_ACCESS_KEY
                value: openlakeuser
    executor:
        cores: 1
        instances: 3
        memory: "2048m"
        labels:
            version: 3.3.2
        env:
            -   name: INPUT_PATH
                value: "s3a://openlake/spark/sample-data/taxi-data.csv"
            -   name: AWS_REGION
                valueFrom:
                    secretKeyRef:
                        name: minio-secret
                        key: AWS_REGION
            -   name: AWS_ACCESS_KEY_ID
                valueFrom:
                    secretKeyRef:
                        name: minio-secret
                        key: AWS_ACCESS_KEY_ID
            -   name: AWS_SECRET_ACCESS_KEY
                valueFrom:
                    secretKeyRef:
                        name: minio-secret
                        key: AWS_SECRET_ACCESS_KEY
            -   name: ENDPOINT
                valueFrom:
                    secretKeyRef:
                        name: minio-secret
                        key: ENDPOINT


您可以使用以下命令部署上述 sparkjob-iceberg.yml

!kubectl apply -f sample- code /spark-job/sparkjob-iceberg.yml

部署应用后,您可以使用以下命令查看应用的状态:

!kubectl get sparkapplications -n spark-operator


您还可以使用以下命令检查应用程序的日志(因为我们在 Spark 应用程序中禁用了 INFO 日志,在我们的应用程序日志开始显示之前您可能看不到太多活动):

!kubectl logs -f spark-iceberg-driver -n spark-operator # 完成后停止这个 shell

应用程序完成后,您可以使用以下命令删除应用程序:

!kubectl delete sparkapplications spark-iceberg -n spark- operator

代码演练

现在我们已经看到了端到端的代码,让我们详细看一下代码片段

设置 Iceberg 属性

# adding iceberg configs
conf = (
    SparkConf()
    .set("spark.sql.extensions",
        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") # Use Iceberg with Spark
    .set("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.sql.catalog.demo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .set("spark.sql.catalog.demo.warehouse", "s3a://warehouse/")
    .set("spark.sql.catalog.demo.s3.endpoint", "https://play.min.io:50000")
    .set("spark.sql.defaultCatalog", "demo") # Name of the Iceberg catalog
    .set("spark.sql.catalogImplementation", "in-memory")
    .set("spark.sql.catalog.demo.type", "hadoop") # Iceberg catalog type
    .set("spark.executor.heartbeatInterval", "300000")
    .set("spark.network.timeout", "400000")
)

创建 Iceberg 表

# Read CSV file from MinIO
df = spark.read.option("header", "true").schema(schema).csv(
    os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))

# Create Iceberg table "nyc.taxis_large" from RDD
df.write.saveAsTable("nyc.taxis_large")

# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data: {total_rows_count}")

在上面的代码片段中,我们从 Minio https://play.min.io:50000 端点读取了 taxi-data.csv 文件,并将其保存为 Iceberg 表 nyc.taxis_large。Iceberg 表保存后,我们使用 Spark SQL 查询 nyc.taxis_large 以获取当前的记录总数。

模式演变

# Rename column "fare_amount" in nyc.taxis_large to "fare"
spark.sql("ALTER TABLE nyc.taxis_large RENAME COLUMN fare_amount TO fare")

# Rename column "trip_distance" in nyc.taxis_large to "distance"
spark.sql("ALTER TABLE nyc.taxis_large RENAME COLUMN trip_distance TO distance")

# Add description to the new column "distance"
spark.sql(
    "ALTER TABLE nyc.taxis_large ALTER COLUMN distance COMMENT 'The elapsed trip distance in miles reported by the taximeter.'")

# Move "distance" next to "fare" column
spark.sql("ALTER TABLE nyc.taxis_large ALTER COLUMN distance AFTER fare")

# Add new column "fare_per_distance" of type float
spark.sql("ALTER TABLE nyc.taxis_large ADD COLUMN fare_per_distance FLOAT AFTER distance")

# Check the snapshots available
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show()  # prints all the available snapshots (1 till now)

# Populate the new column "fare_per_distance"
logger.info("Populating fare_per_distance column...")
spark.sql("UPDATE nyc.taxis_large SET fare_per_distance = fare/distance")


上面的代码通过重命名、更改列类型、添加新列 fare_per_distance 并根据 fare 和 distance 列的值填充新列来演示模式演变。

从表中删除数据

# Delete rows from "fare_per_distance" based on criteria
logger.info("Deleting rows from fare_per_distance column...")
spark.sql("DELETE FROM nyc.taxis_large WHERE fare_per_distance > 4.0 OR distance > 2.0")
spark.sql("DELETE FROM nyc.taxis_large WHERE fare_per_distance IS NULL")

# Check the snapshots available
logger.info("Checking snapshots...")
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show()  # prints all the available snapshots (4 now) since previous operations will create 2 new snapshots

# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data after delete operations: {total_rows_count}")


在上面的代码片段中,当新字段 fare_per_distance 为 null 或大于 4.0 以及距离字段大于 2.0 时,我们从中删除记录。操作完成后,我们查询快照表以查看创建了 2 个新快照。我们还获得了总记录数,这明显少于我们开始时的数量(397014 对 112234626)。

对表进行分区

# Partition table based on "VendorID" column
logger.info("Partitioning table based on VendorID column...")
spark.sql("ALTER TABLE nyc.taxis_large ADD PARTITION FIELD VendorID")

这段代码使用 VendorID 列创建了一个新分区。此分区将适用于向前插入的新行,旧数据不会受到影响。我们还可以在创建 Iceberg 表时添加分区,如下所示

CREATE TABLE IF NOT EXISTS nyc.taxis_large (VendorID BIGINT, tpep_pickup_datetime STRING, tpep_dropoff_datetime STRING, passenger_count DOUBLE, trip_distance DOUBLE, RatecodeID DOUBLE, store_and_fwd_flag STRING, PULocationID BIGINT, DOLocationID BIGINT, payment_type BIGINT, fare_amount DOUBLE, extra DOUBLE, mta_tax DOUBLE, tip_amount DOUBLE, tolls_amount DOUBLE, improvement_surcharge DOUBLE, total_amount DOUBLE) PARTITIONED BY VendorID USING iceberg;

元数据表

# Query Metadata tables like snapshot, files, history
logger.info("Querying Snapshot table...")
snapshots_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots ORDER BY committed_at")
snapshots_df.show()  # shows all the snapshots in ascending order of committed_at column

logger.info("Querying Files table...")
files_count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large.files")
total_files_count = files_count_df.first().cnt
logger.info(f"Total Data Files for NYC Taxi Data: {total_files_count}")

spark.sql("""SELECT file_path,
                    file_format,
                    record_count,
                    null_value_counts,
                    lower_bounds,
                    upper_bounds
                    FROM nyc.taxis_large.files LIMIT 1""").show()

# Query history table
logger.info("Querying History table...")
hist_df = spark.sql("SELECT * FROM nyc.taxis_large.history")
hist_df.show()

Iceberg 有像快照、文件、历史这样的元数据表,我们可以查询它们以了解幕后发生的事情。例如,通过查询快照表,我们可以看到创建新快照时执行的操作。文件表为我们提供了有关存储在 Minio 中的数据文件的信息,例如每个文件的记录数、文件格式等。在历史表中,我们获得了有关快照何时成为当前快照以及父级是谁等的所有信息。

带快照的时间旅行

# Time travel to initial snapshot
logger.info("Time Travel to initial snapshot...")
snap_df = spark.sql("SELECT snapshot_id FROM nyc.taxis_large.history LIMIT 1")
spark.sql(f"CALL demo.system.rollback_to_snapshot('nyc.taxis_large', {snap_df.first().snapshot_id})")

# Qurey the table to see the results
res_df = spark.sql("""SELECT VendorID
                            ,tpep_pickup_datetime
                            ,tpep_dropoff_datetime
                            ,fare
                            ,distance
                            ,fare_per_distance
                            FROM nyc.taxis_large LIMIT 15""")
res_df.show()

# Query history table
logger.info("Querying History table...")
hist_df = spark.sql("SELECT * FROM nyc.taxis_large.history")
hist_df.show()  # 1 new row

# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data after time travel: {total_rows_count}")


可以使用捕获在给定时间进行的交易的快照在 Iceberg 中进行时间旅行。在上面的代码中,我们查询历史表以获取曾经创建的第一个快照,并对那个 snapshot_id 执行 roll_back_to_snapshot 系统调用。如果我们在执行回滚后查询表,我们可以清楚地看到 fare_per_distance 字段为 null,记录计数回到 112234626。最后,历史表有一条新记录,其中包含我们使用的 snapshot_id。

这是我们可以使用 Apache Iceberg 做什么的高级概述。还有对表审计和维护的支持,我们稍后会探讨。Apache Iceberg 还在快照之上添加了对标签和分支的支持,这具有巨大的潜力。一旦功能齐全,我们将探索它。

Spark 和 Iceberg 在 MinIO 上的运行情况很酷


Iceberg、Spark 和 MinIO 是用于构建可扩展的高性能数据湖的强大技术组合。Iceberg 的开放表格式和对多种引擎的支持使其成为企业数据湖的绝佳选择。将 MinIO 用于 Iceberg 存储为多云数据湖和分析奠定了坚实的基础。MinIO 包括主动-主动复制以在不同位置(内部部署、公共/私有云和边缘)之间同步数据,从而实现企业所需的功能,例如地理负载平衡和快速热-热故障转移。

今天在 MinIO 上试用 Iceberg。如果您有任何疑问或想分享技巧,请通过我们的Slack 频道联系我们或在 sales@minio.org.cn 上给我们留言。



上一篇 下一篇