使用 MinIO 和 Kubeflow v2.0 构建 ML 训练管道2
介绍
在上一篇文章中,我介绍了[]() 使用 MinIO 和 Kubeflow v2.0 构建 ML 数据管道 。 我创建的数据管道将美国人口普查数据下载到 MinIO 的专用实例。 这与 Kubeflow Pipelines (KFP) 内部使用的 MinIO 实例不同。 我们本可以尝试使用 KFP 的 MinIO 实例 - 然而,这并不是 ML 模型训练管道的最佳设计。 随着 GPU 变得越来越快 - GPU 有可能完成其计算而必须等待新数据。 正在等待的 GPU 是未充分利用的 GPU。 您将需要一个完全由您控制的高速存储解决方案。 下面是我们的 MinIO 部署图,说明了每个实例的用途。

值得注意的是,我的数据管道代码可以轻松修改以与任何数据源交互 - 另一个外部 API、SFTP 位置或队列。 如果您有一个缓慢且不可靠的外部数据源,并且需要高速弹性存储解决方案中的数据副本,那么可以使用我的代码作为起点,将数据导入 MinIO。
在这篇文章中,我将采取下一步并训练模型。 我假设您熟悉用于创建组件和管道的 Kubeflow 装饰器。 我还假设您熟悉在组件之间传递数据。 如果这些结构对您来说是新的,请查看我的上一篇文章 构建数据管道。
如果我们要训练一个模型,那么我们需要一些东西来预测。 换句话说,我们需要一个具有特征和标签的数据集。
数据集
Kaggle 是寻找数据集的好地方。 您不仅会发现各种各样的数据集,而且许多数据集都是个人和团队争夺奖品的挑战的主题。 由于过去挑战的结果被保留,我们可以将我们的结果与获胜者进行比较。
我们将使用的数据集来自 GoDaddy - 微企业密度预测 挑战赛。 我喜欢这个数据集有几个原因。 首先,本次竞赛的目标是预测美国各县每月的微型企业密度。 政策制定者将利用获胜的模型来了解微型企业,从而制定新的政策和计划,以提高这些最小企业的成功和影响力。 其次,数据按美国县进行细分 - 因此我们可以使用美国人口普查数据来增强此数据(记住我在上一篇文章中构建的管道)以提高准确性。 这是比赛中允许的 - 因此,当我们将结果与排行榜进行比较时,我们并没有给自己带来不公平的优势。 最后,虽然我们的模型将预测微型企业密度,但它可以轻松地进行调整,为包含地理信息的数据集提供其他预测。
以下是为此挑战提供的数据集示例。 “microbusiness_密度”列是标签(我们的模型将尝试预测的内容),所有其他列都是潜在特征。

训练模型实际上是一个管道,它通过一系列任务移动数据,从而形成经过训练的模型。 这就是为什么 MinIO 和 KFP 是训练模型的强大工具。 MinIO 用于高性能、可靠地访问您的原始数据,KFP 用于可重复的管道并记录结果。
在编写代码之前,让我们构建管道的逻辑设计。
逻辑管道设计
以下是我们管道的逻辑设计。 这是不言自明的,如果您曾经训练过模型,那么您以前已经见过这些任务。

每个块都显示了我们管道中的一个高级任务。 在构建数据管道时,我们能够专门使用轻量级 Python 组件。 轻量级 Python 组件 是根据 Hermetic 函数创建的 - 换句话说 ,不调用其他函数并且需要最少导入的函数。 这些函数被内置到一个映像中,该映像被部署到将运行的容器中。 KFP 负责构建、部署和跨容器编组数据。 这是在 KFP 中设置代码的最简单方法。 但是,如果您有很多依赖项,或者希望将整个模块(或多个模块)打包到单个映像中,那么您可以使用容器化Python组件。 我们将在此管道中使用容器化的 Python 组件。 训练 Pytorch 模型需要创建一个 DataSet 类、一个模型类,以及一些其他辅助函数,用于验证、测试和将数据移动到 GPU(如果可用)。
此外,请注意我们的任务之间传递的工件:上图中的数据集、模型和 HTML。 我假设您熟悉数据集并在代码中设置它们。 (这就是我们通过管道的第一部分传递 DataFrame 的方式。)在这篇文章中,我将介绍一些其他工件。
让我们从创建容器化 Python 组件开始。
容器化 Python 组件
容器化 Python 组件允许我们指定 KFP 使用的镜像中内置的内容。 让我们看一个简单的示例,演示如何创建容器化 Python 组件及其优点。 您需要做的第一件事是在项目中创建一个子文件夹。 我创建了一个名为“src”的子文件夹。下面的屏幕截图显示了我的“src”文件夹以及创建和训练模型所需的所有模块。 您放置在此子文件夹中的所有模块都将成为 KFP 用于所有管道组件的图像的一部分。

接下来,考虑下面的函数,它位于 model_utilities.py 模块中。 该函数包含用于训练模型的纪元循环。 它调用其他函数 - `get_optimizer()`、`train_epoch()` 和 `validate_loss()`。 它还实例化该模块中的类 - 特别是 `CountyDataset` 和 `MBDModel`。
def train\_model(df\_train\_X, df\_train\_y, df\_valid\_X, df\_valid\_y,
epochs=5, lr=0.01, wd=0.0) -> Tuple[nn.Module, List]:
config = du.get\_config()
categorical\_cols = config['categorical\_cols']
continuous\_cols = config['continuous\_cols']
embedding\_sizes = [(3135, 50), (51, 26)]
model = RegressionModel(embedding\_sizes, len(continuous\_cols))
# Create the datasets
train\_dataset = CountyDataset(df\_train\_X, df\_train\_y, categorical\_cols)
valid\_dataset = CountyDataset(df\_valid\_X, df\_valid\_y, categorical\_cols)
# Create the loaders
batch\_size = 10
train\_loader = DataLoader(train\_dataset, batch\_size=batch\_size, shuffle=True)
valid\_loader = DataLoader(valid\_dataset, batch\_size=batch\_size, shuffle=True)
optim = get\_optimizer(model, lr = lr, wd = wd)
results = []
for i in range(epochs):
loss = train\_epoch(model, optim, train\_loader)
print('Training loss: ', loss)
val\_loss, val\_accuracy = validate\_model(model, valid\_loader)
results.append((loss, val\_loss, val\_accuracy))
return model, results
如果我们要使用轻量级 Python 组件,那么所有这些函数调用都将发生在另一个容器中,并且数据需要跨容器边界进行编组。 这并不理想。 我们希望模型训练成为在其自己的容器中启动的任务,但我们希望上面列出的辅助函数在同一个容器中运行,以便这些调用是本地且快速的。 这就是容器化 Python 组件允许我们做的事情,我们可以使用下面的函数,它封装了上面的函数。
@dsl.component(base\_image='python:3.10.0',
target\_image='docker/model-training-project/model-training:v1',
packages\_to\_install=['numpy==1.24.3', 'pandas==1.3.5', 'torch==1.13.1'])
def train\_model(train\_X: Input[Dataset], train\_y: Input[Dataset],
valid\_X: Input[Dataset], valid\_y: Input[Dataset],
model: Output[Model], training\_results: Output[Markdown]) -> None:
df\_train\_X = pd.read\_csv(train\_X.path)
df\_train\_y = pd.read\_csv(train\_y.path)
df\_valid\_X = pd.read\_csv(valid\_X.path)
df\_valid\_y = pd.read\_csv(valid\_y.path)
mbd\_model, results = mu.train\_model(df\_train\_X, df\_train\_y, df\_valid\_X, df\_valid\_y)
torch.save(mbd\_model.state\_dict(), model.path)
with open(training\_results.path, 'w') as f:
for result in results:
epoch\_result = f'Training Loss: {result[0]}, Validation Loss: {result[1]},
Validation accuracy: {result[2]}.<br>'
f.write(epoch\_result)
您不需要在辅助模块中使用任何 KFP 装饰器。 在这里使用的示例中,我的辅助模块是 `data_utilities.py` 和 `model_utilities.py`。 这些模块是普通的旧 Python。 其中没有 KFP 特有的内容。 像我在这里使用辅助函数的方式一样使用它们是一个很好的功能。 我正在我的辅助函数中完成预处理数据和训练模型所需的所有繁重工作,这些函数可以使用普通的旧 Python 编写。 所有需要用 KFP 装饰器装饰的函数都在 model_training_pipeline.py 模块中,它们只不过是 KFP 和我的辅助模块之间的垫片。 因此,所有 KFP 特定代码都封装在一个小模块中。
请注意,我们仍然使用“dsl.component”装饰器。 使用容器化 Python 组件时,必须在组件装饰器上设置一些附加参数。 `base_image` 将在 KFP 为我们的映像创建的 docker 文件中的 FROM 命令中使用。 这是可选的,如果未指定,则默认为 Python 3.7。 目标图像参数是一个 URI,告诉 KFP 将图像放置在何处。 此 URI 需要指向图像注册表。 我在 Docker Desktop 中运行此演示,因此我使用的 URI 是 Docker Desktop 中的本地注册表。 如果您位于公共云中或者您的组织有自己的内部注册表,请相应地进行更改。 如果我需要在 GCP 的 Google Cloud Artifact Registry 中使用我的图像(请注意 `gcr.io` URI),下面是这个 URI 的示例。
grc.io/model-training-project/model-training:v1
最后,您仍然需要使用 `packages_to_install` 参数,以便 KFP 可以将任何需要的第三方库安装到您的映像中。 但是,您不再需要在函数中放置导入。 这些导入现在可以在模块级别进行。 其余组件的代码如下所示。 为了简洁起见,我没有显示帮助程序模块,因为它们包含大量代码。 但是,您可以在此处获取它们。 它们是普通的旧 Python 和 Pytorch,如果您想在发送到 KFP 之前进行实验,可以在 KFP 之外的脚本或笔记本中运行。
@dsl.component(base\_image='python:3.10.0',
target\_image='docker/model-training-project/model-training:v1',
packages\_to\_install=['pandas==1.3.5', 'minio==7.1.14'])
def get\_raw\_data(bucket: str, object\_name: str, table\_df: Output[Dataset]):
'''
Return an object as a Pandas DataFrame.
'''
df = du.get\_object(bucket, object\_name)
df.to\_csv(table\_df.path, index=False)
@dsl.component(base\_image='python:3.10.0',
target\_image='docker/model-training-project/model-training:v1',
packages\_to\_install=['numpy==1.24.3', 'pandas==1.3.5', 'scikit-learn==1.0.2'])
def preprocess(in\_df: Input[Dataset], out\_df: Output[Dataset]) -> None:
'''
Preprocess the dataframe.
'''
df = pd.read\_csv(in\_df.path)
df = du.preprocess(df)
df.to\_csv(out\_df.path, index=False)
@dsl.component(base\_image='python:3.10.0',
target\_image='docker/model-training-project/model-training:v1',
packages\_to\_install=['numpy==1.24.3', 'pandas==1.3.5', 'scikit-learn==1.0.2'])
def feature\_engineering(pre: Input[Dataset],
train\_X: Output[Dataset], train\_y: Output[Dataset],
valid\_X: Output[Dataset], valid\_y: Output[Dataset],
test\_X: Output[Dataset], test\_y: Output[Dataset],
validation\_size: int=1, test\_size: int=1) -> None:
'''
Feature engineering.
'''
logger = logging.getLogger('kfp\_logger')
logger.setLevel(logging.INFO)
df = pd.read\_csv(pre.path)
df\_train\_X, df\_train\_y, df\_valid\_X, df\_valid\_y, df\_test\_X, df\_test\_y =
du.feature\_engineering(df, validation\_size, test\_size)
df\_train\_X.to\_csv(train\_X.path, index=False)
df\_train\_y.to\_csv(train\_y.path, index=False)
df\_valid\_X.to\_csv(valid\_X.path, index=False)
df\_valid\_y.to\_csv(valid\_y.path, index=False)
df\_test\_X.to\_csv(test\_X.path, index=False)
df\_test\_y.to\_csv(test\_y.path, index=False)
logger.info('Feature engineering complete.')
@dsl.component(base\_image='python:3.10.0',
target\_image='docker/model-training-project/model-training:v1',
packages\_to\_install=['numpy==1.24.3', 'pandas==1.3.5', 'torch==1.13.1'])
def test\_model(test\_X: Input[Dataset], test\_y: Input[Dataset], model: Input[Model],
test\_results: Output[Markdown]) -> None:
df\_test\_X = pd.read\_csv(test\_X.path)
df\_test\_y = pd.read\_csv(test\_y.path)
mbd\_model = mu.create\_model()
mbd\_model.load\_state\_dict(torch.load(model.path))
smape = mu.test\_model(df\_test\_X, df\_test\_y, mbd\_model)
with open(test\_results.path, 'w') as f:
f.write('Symetric Mean Absolute Percent Error (SMAPE): \*\*' + str(smape) + '\*\*')
一旦所有需要的模块都位于公共文件夹中并且代表管道组件的函数被正确修饰,您就可以为 KFP 构建映像了。 KFP 命令行实用程序使这一切变得简单。
kfp component build src/ --component-filepattern model\_training\_pipeline.py --push-image
该命令告诉 KFP 包含所有模块的文件夹的位置以及包含组件定义的模块。 此命令成功完成后,您会注意到多个文件已添加到您的子文件夹中,如下所示。

检查这些文件将使您更深入地了解 KFP 的工作原理。 基本上,KFP 使用组件装饰器中放置的所有信息来创建这些文件。 然后,这些文件用于创建映像并将其推送到您的注册表。 如果您使用 Docker Desktop 并且使用了相同的 `target_image` 参数,那么您将在映像列表中看到以下内容。

我们现在准备构建一个管道,用于协调上面显示的指定为管道组件的所有功能。
管道
将所有内容整合在一起的管道函数如下所示。 这非常简单。 一个组件的输出是下一个组件的输入。 请注意,所有调用的函数都映射到我们概念管道中的任务。
@dsl.pipeline(name='model-training-pipeline',
description='Pipeline that will train a Nueral Network.')
def training\_pipeline(bucket: str, object\_name: str) -> Markdown:
raw\_dataset = get\_raw\_data(bucket=bucket, object\_name=object\_name)
processed\_dataset = preprocess(in\_df=raw\_dataset.outputs['table\_df'])
final\_datasets = feature\_engineering(pre=processed\_dataset.outputs['out\_df'])
training\_results = train\_model(train\_X=final\_datasets.outputs['train\_X'],
train\_y=final\_datasets.outputs['train\_y'],
valid\_X=final\_datasets.outputs['valid\_X'],
valid\_y=final\_datasets.outputs['valid\_y'])
testing\_results = test\_model(test\_X=final\_datasets.outputs['test\_X'],
test\_y=final\_datasets.outputs['test\_y'],
model=training\_results.outputs['model'])
return testing\_results.outputs['test\_results']
要将此管道提交给 KFP,请运行以下代码。
def start\_training\_pipeline\_run():
client = Client()
run = client.create\_run\_from\_pipeline\_func(training\_pipeline,
experiment\_name='Containerized Python Components',
enable\_caching=False,
arguments={
'bucket': 'microbusiness-density',
'object\_name': 'train.csv'
})
url = f'{kfp\_endpoint}/#/runs/details/{run.run\_id}'
print(url)
在 KFP 中运行此管道会在 KFP 的“运行”选项卡中生成以下可视化效果。

使用工件报告结果
工件用于在组件之间传递大型或复杂的数据。 在底层,KFP 将此数据保存到其 MinIO 实例中,以使其可供管道中的任何组件使用。 附带的好处是,KFP 可以进行类型检查,并在将一个组件的输出传递到另一个组件的输入时让您知道数据类型是否不正确。 KFP 还提供用于报告结果的工件。 让我们进一步探讨这些报告工件。
训练模型时,最佳实践是根据验证集跟踪损失函数的结果和模型的准确性。 Metrics、HTML 和 Markdown 工件都有一个可视化组件。 这些工件的内容可以在 KFP 的 UI 中轻松观察到。 这篇文章使用 Markdown 对象来报告训练和测试期间的模型性能。 请参阅“train_model\”组件中的最后几行代码,该组件创建了一个名为“training_results”的 Markdown 对象。
创建此工件后,它包含的数据将自动显示在“training_results\”内的“Visualization”选项卡中。 下面是该可视化的屏幕截图。

在针对测试集测试模型后执行相同的操作也是最佳实践。
对模型的评论
我想指出本文中使用的模型中的一个巧妙的功能 - 即使它与当前的主题稍微正交。 下面是代码 - 感兴趣的层被突出显示。 该层是嵌入层。 它用于了解县和州与微企业密度的关系。 此处使用的嵌入在内存向量数据库中很小。 在神经网络的各层中,这些向量用于分类特征(或非连续特征),以允许神经网络学习分类或离散数据的有意义的表示。 理解这一层是理解矢量数据库的良好开端。 矢量数据库可以根据矢量表示有效检索相似项目,从而实现查找相似图像、推荐相似产品或搜索相似文档等任务。 我们 MinIO 正在研究向量,并将在未来几个月内有更多内容。
class RegressionModel(nn.Module):
def \_\_init\_\_(self, embedding\_sizes, n\_cont):
super(RegressionModel, self).\_\_init\_\_()
# Set up the embeddings.
self.embeddings = nn.ModuleList([nn.Embedding(categories, size) for categories,size
in embedding\_sizes])
n\_emb = sum(e.embedding\_dim for e in self.embeddings) #length of all embeddings
self.n\_emb, self.n\_cont = n\_emb, n\_cont
# Set up the rest of the network.
self.linear1 = nn.Linear(self.n\_emb + self.n\_cont, 20, bias=True)
self.linear2 = nn.Linear(20, 20, bias=True)
self.linear3 = nn.Linear(20, 1, bias=True)
def forward(self, x\_cat, x\_cont):
out = [e(x\_cat[:,i]) for i,e in enumerate(self.embeddings)]
out = torch.cat(out, 1)
out = torch.cat([out, x\_cont], 1)
out = F.relu(self.linear1(out))
out = F.relu(self.linear2(out))
out = self.linear3(out)
return out
概括
这篇文章接续了我上一篇关于 KFP 数据管道的文章的内容。 在这篇文章中,我展示了如何使用 KFP 来训练模型。 我介绍了容器化 Python 组件,它更适合训练模型。 训练模型时,您需要创建类并使用辅助函数,这些函数最好在进行训练的同一容器中运行。
下一步
安装MinIO、KFP,并下载代码示例,以便您可以执行自己的实验。 尝试不同的模型架构,通过特征工创建新特征,并查看您的结果与 Kaggle 的排行榜。
如果您有疑问或想分享您的结果,请发送邮件至 sales@minio.org.cn 或加入我们的 Slack 通用频道 上的讨论。 分钟.io)。