Spark、MinIO 和 Kubernetes
Apache Spark 是一种用于大数据处理和分析的开源分布式计算系统。它旨在快速、高效且易于使用地处理大规模数据。Spark 为大规模数据处理提供统一的分析引擎,支持多种语言,包括 Java、Scala、Python 和 R。
使用 Spark 的好处很多。首先,它提供了高水平的并行性,这意味着它可以跨集群中的多个节点快速高效地处理大量数据。其次,Spark 提供了一组丰富的数据处理 API,包括对 SQL 查询、机器学习、图形处理和流处理的支持。第三,Spark 具有灵活和可扩展的架构,允许开发人员轻松地与各种数据源和其他工具集成。
在运行 Spark 作业时,使用合适的存储系统来存储输入和输出数据至关重要。像 MinIO 这样的对象存储系统是针对 PB 级数据运行 Spark 作业的唯一方法,因为它们是高度可扩展且持久的存储解决方案。MinIO 是一个开源对象存储系统,可以轻松部署在本地或您选择的云中。凭借行业领先的 S3 兼容性,MinIO 可与支持 S3 API 的各种工具一起使用,包括 Spark。
与传统的 Hadoop 分布式文件系统 (HDFS) 或其他基于文件的存储系统相比,将 MinIO 与 Spark 结合使用具有多项优势。MinIO 具有高度可扩展性,可以轻松处理 PB 级的大量数据。MinIO 的读取速度超过2.6Tbps,写入速度超过 1.32Tbps,提供了支持大型 Spark 数据集所需的大规模性能。MinIO 是一种灵活且经济高效的存储解决方案,可以轻松与其他工具和系统集成。写入 MinIO 的数据是不可变的和版本化的,并且高度耐用,纠删码数据的多个副本存储在多个节点上以实现冗余和容错。完善功能、主动-主动复制和批量复制可用于进一步的冗余和容错,或者只是将数据移动到最适合使用的地方。
为什么在 Kubernetes 上使用 Spark?
与独立部署相比,在 Kubernetes 上部署 Apache Spark 具有多项优势。以下是一些原因:
资源管理: Kubernetes 提供了强大的资源管理功能,可以帮助优化资源利用率并最大程度地减少浪费。通过在 Kubernetes 上部署 Spark,您可以利用 Kubernetes 的资源分配和调度功能,根据需要动态地为 Spark 作业分配资源。
可扩展性: Kubernetes 可以根据工作负载自动扩展分配给 Spark 的资源。这意味着 Spark 可以根据需要处理的数据量进行扩展或缩减,而无需人工干预。
容错性: Kubernetes 提供了内置的容错机制来保证 Spark 集群的可靠性。如果集群中的一个节点发生故障,Kubernetes 会自动将 Spark 任务重新调度到另一个节点,确保工作负载不受影响。
简化部署: Kubernetes 提供了一种简化的部署模型,您可以在其中使用单个 YAML 文件部署 Spark。该文件指定了 Spark 集群所需的资源,其余的由 Kubernetes 自动处理。
与其他 Kubernetes 服务集成:通过在 Kubernetes 上部署 Spark,您可以利用其他 Kubernetes 服务(例如监控和日志记录)来更好地了解 Spark 集群的性能和健康状况。
在 Kubernetes 上设置 Spark
我们将使用 Spark Operator 在 Kubernetes 上设置 Spark。Spark Operator 是一个 Kubernetes 控制器,允许您在 Kubernetes 上管理 Spark 应用程序。它提供了一个名为 SparkApplication 的自定义资源定义 (CRD),允许您在 Kubernetes 上定义和运行 Spark 应用程序。Spark Operator 还提供了一个 Web UI,可以让您轻松监控和管理 Spark 应用程序。Spark Operator 建立在 Kubernetes Operator SDK 之上,这是一个用于构建 Kubernetes Operator 的框架。Spark Operator是开源的,可在 GitHub 上获取。它也可以作为 Helm 图表使用,这使得在 Kubernetes 上部署变得容易。在本教程中,我们将使用 Helm chart 在 Kubernetes 集群上部署 Spark Operator。
Spark Operator 提供各种功能来简化 Kubernetes 环境中 Spark 应用程序的管理。其中包括使用自定义资源的声明性应用程序规范和管理、符合条件的 SparkApplications 的自动提交、对计划应用程序的本机 cron 支持,以及通过 mutating admission webhook 自定义超出本机功能的 Spark pod。
此外,该工具支持更新后的 SparkAppliations 的自动重新提交和重启,以及使用线性退避重试失败的提交。它还提供将本地 Hadoop 配置挂载为 Kubernetes ConfigMap 并通过sparkctl自动将本地应用程序依赖项暂存到 MinIO 的功能。最后,该工具支持收集应用程序级指标和驱动程序/执行程序指标并将其导出到 Prometheus。
先决条件
要学习本教程,您需要:
一个 Kubernetes 集群。您可以使用Minikube在您的机器上设置本地 Kubernetes 集群。
Helm,Kubernetes 的包管理器。您可以按照本指南在您的机器上安装 Helm。
在裸机或 Kubernetes 上运行的 MinIO 服务器。您可以按照本指南在裸机上安装 MinIO 或本指南在 Kubernetes 上安装 MinIO,或者您可以使用MinIO Play 服务器进行测试。
用于访问 MinIO 服务器的 MinIO 客户端 (mc)。您可以按照本指南在您的机器上安装 mc。
安装 Spark 运算符
要安装 Spark Operator,您需要将 Spark Operator 的 Helm 存储库添加到本地 Helm 客户端。您可以通过运行以下命令来执行此操作:
helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
添加存储库后,您可以使用以下命令安装 Spark Operator(安装时可能需要等待一分钟):
helm install my-release spark-operator/spark-operator \ --namespace spark-operator \ --set webhook.enable=true \ --set image.repository=openlake/spark-operator \ --set image.tag=3.3.1 \ --create-namespace
您将看到以下输出:
LAST DEPLOYED: Mon Feb 27 19:48:33 2023 NAMESPACE: spark-operator STATUS: deployed REVISION: 1 TEST SUITE: None
此命令在命名空间中安装 Spark Operator spark-operator,并启用 mutating admission webhook。需要 webhook 才能将本地 Hadoop 配置挂载为 Kubernetes ConfigMap,并配置驱动程序和执行程序可以使用的 env 变量。镜像仓库和标签设置为包含最新版本 Spark Operator 的镜像。您还可以通过省略--set image.repository和--set image.tag标志来使用默认图像存储库和标记,在撰写本文时,最新的 Spark Operator 版本使用 3.1.1 版本的 Spark 而openlake/spark-operator使用最新的 3.3.1 版本的 Spark。--create-namespace如果您已经有一个名为 的命名空间,则可以跳过该标志spark-operator。这还将监视所有命名空间中的所有 Spark 应用程序。
可以在此处找到配置选项的详细列表。
验证 Spark Operator 安装
要验证 Spark Operator 是否安装成功,您可以运行以下命令:
kubectl get pods -n spark-operator
您将看到类似于以下输出的结果:
NAME READY STATUS RESTARTS AGE my-release-spark-operator-f56c4d8c4-pr857 1/1 Running 0 14m
现在我们已经安装了 Spark operator,我们可以在 Kubernetes 上部署 Spark 应用程序或 Scheduled Spark 应用程序。
部署 Spark 应用程序
让我们尝试部署 Spark 运算符附带的示例简单 Spark 应用程序之一。您可以在此处找到示例应用程序列表,我们对计算 Pi 很感兴趣,因此我们将修改spark Pi 应用程序以使用 Spark 3.3.1 并在 Kubernetes 上运行它。
apiVersion: "sparkoperator.k8s.io/v1beta2"kind: SparkApplicationmetadata: name: pyspark-pi namespace: spark-operatorspec: type: Python pythonVersion: "3" mode: cluster image: "openlake/spark-py:3.3.1" imagePullPolicy: Always mainApplicationFile: local:///opt/spark/examples/src/main/python/pi.py sparkVersion: "3.3.1" restartPolicy: type: OnFailure onFailureRetries: 3 onFailureRetryInterval: 10 onSubmissionFailureRetries: 5 onSubmissionFailureRetryInterval: 20 driver: cores: 1 coreLimit: "1200m" memory: "512m" labels: version: 3.1.1 serviceAccount: my-release-spark executor: cores: 1 instances: 1 memory: "512m" labels: version: 3.3.1
上面的应用程序将使用 Kubernetes 上的 Spark 计算 Pi 的值。您可以将上述应用程序另存为spark-pi.yaml并使用以下命令部署它:
kubectl apply -f spark-pi.yaml
要验证作业是否正在运行,您可以运行以下命令:
kubectl -n spark-operator get pods
你应该看到这样的东西:
NAME READY STATUS RESTARTS AGE my-release-spark-operator-59bccf4d94-fdrc9 1/1 Running 0 24d my-release-spark-operator-webhook-init-jspnn 0/1 Completed 0 68d pyspark-pi-driver 1/1 Running 0 23s pythonpi-b6a3e48693762e5d-exec-1 1/1 Running 0 7s
您可以使用以下命令检查应用程序的状态:
kubectl get sparkapplications -n spark-operator
您将看到以下输出:
NAME STATUS ATTEMPTS START FINISH AGE pyspark-pi COMPLETED 1 2023-02-27T15:20:29Z 2023-02-27T15:20:59Z 10m
您还可以使用以下命令检查应用程序的日志:
kubectl logs pyspark-pi-driver -n spark-operator
您将看到以下输出:
23/02/27 15:20:55 INFO DAGScheduler: Job 0 finished: reduce at /opt/spark/examples/src/main/python/pi.py:42, took 2.597098 s Pi is roughly 3.137960 23/02/27 15:20:55 INFO SparkUI: Stopped Spark web UI at http://pyspark-pi-d73653869375fa87-driver-svc.spark-operator.svc:4040 23/02/27 15:20:55 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
现在我们已经有了按预期工作的简单 Spark 应用程序,我们可以尝试使用 Spark 从 MinIO 读取和写入数据。
使用 Spark 从 MinIO 读取和写入数据
一旦我们拥有正确的依赖项和配置,使用 Spark 从 MinIO 读取数据和向 MinIO 写入数据就非常简单。在这篇文章中,我们不会讨论依赖项,为了简单起见,我们使用 openlake/spark-py:3.3.1 映像,其中包含使用 Spark 从 MinIO 读取和写入数据所需的所有依赖项。
将演示数据导入 MinIO
我们将使用 MinIO 上可用的 NYC Taxi 数据集。你可以从这里下载数据集,它有大约 112M 行,大小大约为 10GB。对于此练习,任何现有或新的具有足够可用空间的MinIO 部署。您可以使用您选择的任何其他数据集并使用以下命令将其上传到 MinIO,首先我们将创建我们的应用程序将引用的存储桶:
mc mb <Your-MinIO-Endpoint>/openlake mc mb <Your-MinIO-Endpoint>/openlake/spark mc mb <Your-MinIO-Endpoint>/openlake/spark/sample-data mc cp nyc-taxi-data.csv <Your-MinIO-Endpoint>/openlake/spark/sample-data/nyc-taxi-data.csv
示例 Python 应用程序
现在让我们使用 Spark 从 MinIO 读取和写入数据。我们将使用以下示例 python 应用程序来执行此操作。
import logging
import os
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("MinioSparkJob")
spark = SparkSession.builder.getOrCreate()
def load_config(spark_context: SparkContext):
spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID", "<Your-MinIO-AccessKey>"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
os.getenv("AWS_SECRET_ACCESS_KEY", "<Your-MinIO-SecretKey>"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", os.getenv("ENDPOINT", "<Your-MinIO-Endpoint>"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "true")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "1")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")
load_config(spark.sparkContext)
# Define schema for NYC Taxi Data
schema = StructType([
StructField('VendorID', LongType(), True),
StructField('tpep_pickup_datetime', StringType(), True),
StructField('tpep_dropoff_datetime', StringType(), True),
StructField('passenger_count', DoubleType(), True),
StructField('trip_distance', DoubleType(), True),
StructField('RatecodeID', DoubleType(), True),
StructField('store_and_fwd_flag', StringType(), True),
StructField('PULocationID', LongType(), True),
StructField('DOLocationID', LongType(), True),
StructField('payment_type', LongType(), True),
StructField('fare_amount', DoubleType(), True),
StructField('extra', DoubleType(), True),
StructField('mta_tax', DoubleType(), True),
StructField('tip_amount', DoubleType(), True),
StructField('tolls_amount', DoubleType(), True),
StructField('improvement_surcharge', DoubleType(), True),
StructField('total_amount', DoubleType(), True)])
# Read CSV file from MinIO
df = spark.read.option("header", "true").schema(schema).csv(
os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))
# Filter dataframe based on passenger_count greater than 6
large_passengers_df = df.filter(df.passenger_count > 6)
total_rows_count = df.count()
filtered_rows_count = large_passengers_df.count()
# File Output Committer is used to write the output to the destination (Not recommended for Production)
large_passengers_df.write.format("csv").option("header", "true").save(
os.getenv("OUTPUT_PATH", "s3a://openlake-tmp/spark/nyc/taxis_small"))
logger.info(f"Total Rows for NYC Taxi Data: {total_rows_count}")
logger.info(f"Total Rows for Passenger Count > 6: {filtered_rows_count}")上面的应用程序从 MinIO 读取 NYC Taxi 数据集并过滤乘客数大于 6 的行。然后将过滤后的数据写入 MinIO。您可以将上述代码另存为main.py.
构建 Docker 镜像
我们现在将构建包含上述 python 应用程序的 docker 镜像。您可以创建一个包含以下内容的 Dockerfile 来构建镜像:
FROM openlake/spark-py:3.3.1 USER root WORKDIR /app RUN pip3 install pyspark==3.3.1 COPY src/*.py .
您可以构建自己的 docker 镜像或使用openlake/sparkjob-demo:3.3.1Docker Hub 上提供的预构建镜像。如果您需要复习构建 docker 镜像,请参阅docker build。
部署 MinIO Spark 应用程序
要使用 Spark 从 MinIO 读取和写入数据,您需要创建一个包含 MinIO 访问密钥和秘密密钥的秘密。您可以使用以下命令创建密钥:
kubectl create secret generic minio-secret \ --from-literal=AWS_ACCESS_KEY_ID=<Your-MinIO-AccessKey> \ --from-literal=AWS_SECRET_ACCESS_KEY=<Your-MinIO-SecretKey> \ --from-literal=ENDPOINT=<Your-MinIO-Endpoint> \ --from-literal=AWS_REGION=us-east-1 \ --namespace spark-operator
您将看到以下输出:
secret/minio-secret created
现在我们已经创建了秘密,我们可以部署从 MinIO 读取和写入数据的 Spark 应用程序。您可以将以下应用程序另存为sparkjob-minio.yaml:
apiVersion: "sparkoperator.k8s.io/v1beta2"kind: SparkApplicationmetadata: name: spark-minio namespace: spark-operatorspec: type: Python pythonVersion: "3" mode: cluster image: "openlake/sparkjob-demo:3.3.1" imagePullPolicy: Always mainApplicationFile: local:///app/main.py sparkVersion: "3.3.1" restartPolicy: type: OnFailure onFailureRetries: 3 onFailureRetryInterval: 10 onSubmissionFailureRetries: 5 onSubmissionFailureRetryInterval: 20 driver: cores: 1 memory: "1024m" labels: version: 3.3.1 serviceAccount: my-release-spark env: - name: AWS_REGION value: us-east-1 - name: AWS_ACCESS_KEY_ID value: - name: AWS_SECRET_ACCESS_KEY value: executor: cores: 1 instances: 3 memory: "1024m" labels: version: 3.3.1 env: - name: INPUT_PATH value: "s3a://openlake/spark/sample-data/taxi-data.csv" - name: OUTPUT_PATH value: "s3a://openlake/spark/output/taxi-data-output" - name: AWS_REGION valueFrom: secretKeyRef: name: minio-secret key: AWS_REGION - name: AWS_ACCESS_KEY_ID valueFrom: secretKeyRef: name: minio-secret key: AWS_ACCESS_KEY_ID - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: name: minio-secret key: AWS_SECRET_ACCESS_KEY - name: ENDPOINT valueFrom: secretKeyRef: name: minio-secret key: ENDPOINT
上述 Python Spark 应用程序 YAML 文件包含以下配置:
spec.type:应用程序的类型。在本例中,它是一个 Python 应用程序。spec.pythonVersion:应用程序中使用的 Python 版本。spec.mode:应用程序的模式。在本例中,它是一个集群模式应用程序。spec.image:包含应用程序的 docker 镜像。spec.imagePullPolicy:docker镜像的镜像拉取策略。spec.mainApplicationFile:主应用程序文件的路径。spec.sparkVersion:应用程序中使用的 Spark 版本。spec.restartPolicy:应用程序的重启策略。在这种情况下,应用程序将在失败时重新启动。应用程序将重启 3 次,每次重启之间间隔 10 秒。如果申请提交失败,将重启5次,每次重启间隔20秒。spec.driver:应用程序的驱动程序配置。在这种情况下,我们使用 my-release-spark 服务帐户。驱动程序环境变量设置为从 MinIO 读取和写入数据。spec.executor:应用程序的执行器配置。在这种情况下,我们使用 3 个执行器,每个执行器具有 1 个内核和 1GB 内存。执行器环境变量设置为从 MinIO 读取和写入数据。
您可以使用以下命令部署应用程序:
kubectl apply -f sparkjob-minio.yaml
部署应用后,您可以使用以下命令查看应用的状态:
kubectl get sparkapplications -n spark-operator
您将看到以下输出:
NAME STATUS ATTEMPTS START FINISH AGE spark-minio RUNNING 1 2023-02-27T18:47:33Z <no value> 4m4s
应用程序完成后,您可以在 MinIO 中查看输出数据。您可以使用以下命令列出输出目录中的文件:
mc ls minio/openlake/spark/output/taxi-data-output
您还可以使用以下命令检查应用程序的日志:
kubectl logs -f spark-minio-driver -n spark-operator
您将看到以下输出:
23/02/27 19:06:11 INFO FileFormatWriter: Finished processing stats for write job 91dee4ed-3f0f-4b5c-8260-bf99c0b662ba. 2023-02-27 19:06:11,578 - MinioSparkJob - INFO - Total Rows for NYC Taxi Data: 112234626 2023-02-27 19:06:11,578 - MinioSparkJob - INFO - Total Rows for Passenger Count > 6: 1066 2023-02-27 19:06:11,578 - py4j.clientserver - INFO - Closing down clientserver connection 23/02/27 19:06:11 INFO SparkUI: Stopped Spark web UI at http://spark-minio-b8d5c4869440db05-driver-svc.spark-operator.svc:4040 23/02/27 19:06:11 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
您还可以选择使用 Spark UI 在应用程序运行时对其进行监控。您可以使用以下命令端口转发 Spark UI 以供外部访问:
kubectl port-forward svc/spark-minio-ui-svc 4040:4040 -n spark-operator
在您的浏览器中,您可以使用以下 URL 访问 Spark UI:
http://localhost:4040
您将看到以下 Spark UI:

应用程序完成后,您可以使用以下命令删除应用程序:
kubectl delete sparkapplications spark-minio -n spark-operator
部署计划的 Spark 应用程序与部署普通的 Spark 应用程序几乎相同。唯一不同的是需要spec.schedule在 Spark Application YAML 文件中添加该字段,类型为ScheduledSparkApplication. 您可以将以下应用程序另存为sparkjob-minio-scheduled.yaml:
apiVersion: "sparkoperator.k8s.io/v1beta2"kind: ScheduledSparkApplicationmetadata: name: spark-scheduled-minio namespace: spark-operatorspec: schedule: "@every 1h" # Run the application every hour concurrencyPolicy: Allow template: type: Python pythonVersion: "3" mode: cluster image: "openlake/sparkjob-demo:3.3.1" imagePullPolicy: Always mainApplicationFile: local:///app/main.py sparkVersion: "3.3.1" restartPolicy: type: OnFailure onFailureRetries: 3 onFailureRetryInterval: 10 onSubmissionFailureRetries: 5 onSubmissionFailureRetryInterval: 20 driver: cores: 1 memory: "1024m" labels: version: 3.3.1 serviceAccount: my-release-spark env: - name: AWS_REGION value: us-east-1 - name: AWS_ACCESS_KEY_ID value: - name: AWS_SECRET_ACCESS_KEY value: executor: cores: 1 instances: 3 memory: "1024m" labels: version: 3.3.1 env: - name: INPUT_PATH value: "s3a://openlake/spark/sample-data/taxi-data.csv" - name: OUTPUT_PATH value: "s3a://openlake/spark/output/taxi-data-output" - name: AWS_REGION valueFrom: secretKeyRef: name: minio-secret key: AWS_REGION - name: AWS_ACCESS_KEY_ID valueFrom: secretKeyRef: name: minio-secret key: AWS_ACCESS_KEY_ID - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: name: minio-secret key: AWS_SECRET_ACCESS_KEY - name: ENDPOINT valueFrom: secretKeyRef: name: minio-secret key: ENDPOINT
您可以像普通 Spark 应用程序一样部署应用程序并查看其结果。上面的 Spark 应用程序将每小时运行一次,并将输出写入同一个存储桶。
本教程的所有源代码都可以在以下 GitHub 存储库中找到:openlake/spark
面向未来的 Spark-itect
Apache Spark 和 MinIO 是用于数据湖和分析的强大工具。在 Kubernetes 上运行 Spark 可以为 Spark 作业提供更好的资源管理、容错和可扩展性等优势。添加高性能和高度可扩展的 MinIO,您将拥有一个支持所有 Spark 工作负载的组合,无论您需要在您选择的 Kubernetes 平台上运行它们——公共/私有云、数据中心、边缘。
下载 MinIO并试用 Spark Operator。如果您有任何问题,请在我们的 Slack 频道上询问我们。