使用 MLRun 和 MinIO 进行模型训练和 MLOps
在我之前关于 MLRun 的文章中,我们设置了一台开发机器,其中包含试验 MLRun 所需的所有工具。具体来说,我们使用 docker-compose 文件为 MLRun UI、MLRun API 服务、Nuclio、MinIO 和 Jupyter 服务创建容器。一旦我们的集装箱启动,我们就进行了简单的烟雾测试,以确保一切正常。为此,我们创建了一个简单的无服务器函数,该函数记录其输入并连接到 MinIO 以获取存储桶列表。
下面是我们将构建的内容以及 docker-compose 文件中的内容的可视化。我扩展了 docker-compose 文件,您可以在 MLRun 文档中找到该文件,以便它同时包含 API 服务和 MinIO。我将从我上一篇文章结束的地方继续这篇文章。我们将创建另一个用于训练模型的无服务器函数。请记住,MLRun 旨在消除对样板代码的需求,因此我们还应该看到训练模型本身所需的代码减少。如果你考虑一下你编写的用于训练模型的代码 - 特别是在 PyTorch 中 - 无论用于训练模型的数据和模型本身如何,它基本上都是一样的。换句话说,它是样板代码。从理论上讲,它可以用一些添加的配置来替换。此外,部署代码本身应该很简单。我们已经在我的上一篇文章中看到了一些。然而,部署一个驱动模型训练的函数有点复杂,所以我们需要更多的 MLRun 功能来实现这一点。本文附带的代码(在下一节中介绍)包含实用程序,这些实用程序允许使用无法放入内存的数据集来训练模型。我不会在这里讨论这些技术 - 这是另一篇文章的主题。

关于代码下载
这篇文章中显示的所有代码(以及更多代码)都在这里。为简洁起见,我的代码下载中的许多实用程序函数没有显示在这篇文章的代码列表中。但是,许多是用于将 PyTorch 与 MinIO 集成的通用实用程序。在本节中,我将列举一些重要的实用程序,我希望您发现它们对使用 MinIO 存储数据集的所有 ML 项目有用。
MinIO 实用程序 (data_utilities.py):
- get_bucket_list - 返回 MinIO 中的存储桶列表。
- get_image_from_minio - 从 MinIO 返回图像。使用 PIL 将对象转换回其原始格式。
- get_object_from_minio - 从 MinIO 返回一个对象 - 按原样。
- get_object_list - 从存储桶中获取对象列表。
- image_to_byte_stream - 将 PIL 映像转换为字节流,以便将其发送到 MinIO。
- load_mnist_to_minio - 此函数将 MNIST 数据集加载到 MinIO 存储桶中。每个图像都作为单独的对象发送。此方法对于群集中的性能测试非常有用,因为它会创建许多小映像。
- put_image_to_minio - 将图像字节流作为对象放入 MinIO
PyTorch 实用程序 (torch_utilities.py):
- create_minio_data_loaders - 创建一个 PyTorch 数据加载器,其中包含 MinIO 存储桶中的 MNIST 对象列表。对于使用无法放入内存的数据集模拟模型训练测试非常有用。
- create_memory_loaders - 创建一个 PyTorch 数据加载器,其中 MNIST 图像被加载到内存中。
- MNISTModel - 本文中使用的模型。
- ConvNet - 这是读者可以尝试的另一个模型 - 它为 MNIST 数据集创建卷积神经网络 (CNN)。
将现有代码迁移到 MLRun
让我们看看如何迁移现有代码以在 MLRun 中作为无服务器函数运行。如果您有多个模型的大量训练代码,但没有时间更改代码以利用 MLRun 的所有功能,这可能是您采用 MLRun 的一种方法。这是迭代迁移到 MLRun 的可行方法 - 让所有 ML 代码都由 MLRun 管理 - 然后添加代码以利用其他功能。
考虑下面的代码,它在 MNIST 数据集上训练模型。(为简洁起见,省略了导入和日志记录代码。代码下载包含所有导入和日志记录。这是一个可以从任何终端应用程序运行的脚本。“train_model”函数检索数据,创建模型,然后将控制权移交给“epoch_loop”函数进行训练。
def train_model(loader_type: str='memory', bucket_name: str=None, training_parameters: Dict=None):
# Load the data and log loading metrics.
if loader_type == 'memory':
train_loader, test_loader, _ = tu.create_memory_data_loaders(training_parameters['batch_size'])
elif loader_type == 'minio-by-batch':
train_loader, test_loader, _ = tu.create_minio_data_loaders(bucket_name,
training_parameters['batch_size'])
else:
raise Exception('Unknown loader type. Must be "memory" or "minio-by-batch"')
# Create the model.
model = tu.MNISTModel(training_parameters['input_size'],
training_parameters['hidden_sizes'],
training_parameters['output_size'])
# Train the model.
start_time = time()
epoch_loop(model, train_loader, training_parameters)
training_time_sec = (time()-start_time)
# Test the model and log the accuracy as a metric.
testing_metrics = tu.test_model_local(model, test_loader, training_parameters['device'])
def epoch_loop(model: nn.Module, loader: DataLoader, training_parameters: Dict[str, Any]) -> Dict[str, Any]:
# Create the loss and optimizer functions.
loss_func = nn.NLLLoss()
optimizer = optim.SGD(model.parameters(), lr=training_parameters['lr'],
momentum=training_parameters['momentum'])
# Epoch loop
for epoch in range(training_parameters['epochs']):
total_loss = 0
for images, labels in loader:
# Move tensors to the specified device.
images = images.to(training_parameters['device'])
labels = labels.to(training_parameters['device'])
# Flatten MNIST images into a 784 long vector.
images = images.view(images.shape[0], -1)
# Training pass
optimizer.zero_grad()
output = model(images)
loss = loss_func(output, labels)
# Backward pass
loss.backward()
# And optimizes its weights here
optimizer.step()
total_loss += loss.item()
print("Epoch {} - Training loss: {}".format(epoch+1, total_loss/len(loader)))
if __name__ == "__main__":
# training configuration
training_parameters = {
'batch_size': 32,
'device': torch.device('cuda:0' if torch.cuda.is_available() else 'cpu'),
'dropout_input': 0.2,
'dropout_hidden': 0.5,
'epochs': 2,
'input_size': 784,
'hidden_sizes': [1024, 1024, 1024, 1024],
'lr': 0.025,
'momentum': 0.5,
'output_size': 10,
'smoke_test_size': -1
}
train_model(loader_type='memory', bucket_name='mnist',
training_parameters=training_parameters)
如果我们想在 MLRun 中将此函数作为无服务器函数运行,我们只需要用 “mlrun.handler()” 装饰器装饰 “train_model()” 函数,如下所示。
@mlrun.handler()
def train_model(loader_type: str='memory', bucket_name: str=None, training_parameters: Dict=None) - > None:
.....
接下来,我们需要设置 MLRun 环境,创建一个项目,向 MLRun 注册训练函数,然后运行该函数。我们将从 docker-compose 部署中运行的 Jupyter Notebook 来驱动它。执行此操作所需的单元格如下所示。(可以在代码下载的 mnist_training_setup.ipynb 笔记本中找到此代码。这个简单演示中使用的超参数是硬编码的;但是,如果要构建用于生产的模型,则应使用超参数搜索来查找最佳值。
import os
import mlrun
# Set the environment:
mlrun.set_environment(env_file='mlrun.env')
# Create the project:
project_name='mnist-training'
project_dir = os.path.abspath('./')
project = mlrun.get_or_create_project(project_name, project_dir, user_project=False)
# Create the training function.
trainer = project.set_function(
"mnist_training_with_mlrun.py", name="trainer", kind="job",
image="mlrun/mlrun",
requirements=["minio", "torch", "torchvision"],
handler="train_model_with_mlrun"
)
# Hyperparameters
training_parameters = {
'batch_size': 32,
'device': 'cpu',
'dropout_input': 0.2,
'dropout_hidden': 0.5,
'epochs': 2,
'input_size': 784,
'hidden_sizes': [1024, 1024, 1024, 1024],
'lr': 0.025,
'momentum': 0.5,
'output_size': 10,
'smoke_test_size': -1
}
# Run the function.
trainer_run = project.run_function(
"trainer",
inputs={"bucket_name": "mnist", "loader_type": "memory"},
params={"training_parameters": training_parameters},
local=True
)
这就是在 MLRun 中运行现有代码所需的全部工作。但是,如果您仔细查看 epoch_loop() 函数,您会注意到它是样板代码。几乎每个 PyTorch 项目都有类似的功能,无论用于训练模型的模型或数据如何。让我们看看如何使用 MLRun 来删除这个函数。
优化训练代码
我们可以使用 MLRun 的 mlrun_torch.train() 函数删除对上面所示的 epoch_loop 函数的调用。此函数的导入和修订后的 train_model() 函数如下所示。“accuracy()”函数也被传递给 mlrun_torch.train()。
import mlrun.frameworks.pytorch as mlrun_torch
@mlrun.handler()
def train_model_with_mlrun(context: mlrun.MLClientCtx, loader_type: str='memory', bucket_name: str=None, training_parameters: Dict=None):
logger = du.create_logger()
logger.info(loader_type)
logger.info(bucket_name)
logger.info(training_parameters)
# Load the data and log loading metrics.
if loader_type == 'memory':
train_loader, test_loader, _ = tu.create_memory_data_loaders(training_parameters['batch_size'])
elif loader_type == 'minio-by-batch':
train_loader, test_loader, _ = tu.create_minio_data_loaders(bucket_name, training_parameters['batch_size'])
else:
raise Exception('Unknown loader type. Must be "memory" or "minio-by-batch"')
# Train the model and log training metrics.
logger.info('Creating the model.')
model = tu.MNISTModel(training_parameters['input_size'], training_parameters['hidden_sizes'], training_parameters['output_size'])
loss_func = nn.NLLLoss()
optimizer = optim.SGD(model.parameters(), lr=training_parameters['lr'], momentum=training_parameters['momentum'])
# Train the model.
logger.info('Training the model.')
mlrun_torch.train(
model=model,
training_set=train_loader,
loss_function=loss_func,
optimizer=optimizer,
validation_set=test_loader,
metric_functions=[accuracy],
epochs=training_parameters['epochs'],
custom_objects_map={"torch_utilities.py": "MNISTModel"},
custom_objects_directory=os.getcwd(),
context=context,
)
def accuracy(y_pred, y_true):
ps = torch.exp(y_pred)
pred_label = ps.argmax(1)
return (sum(pred_label == y_true) / y_true.size()[0]).item()
在迁移代码以使用此函数时,需要记住一些事项。
- 首先,如果你在 “epoch_loop()” 函数中声明了 loss 函数和优化器,那么你需要移动这些声明,因为它们必须传递给 “mlrun_torch.train()”。
- 其次,如果在纪元循环中执行任何转换,则需要将它们移动到数据处理逻辑中,或者更好的是,移动到数据管道中。如果您正在执行图像增强和特征工程,则尤其如此。
- 最后,要针对训练期间未看到的数据集测试模型,只需提供 accuracy() 函数,如上所示。
虽然此函数减少了您必须编写的代码,但它最大的好处是指标跟踪和工件管理。让我们快速浏览一下 MLRun UI,看看从我们第一次完全托管运行(我们训练模型)中保存了哪些内容。
查看运行
MLRun UI 的主页显示所有项目的列表。对于每个项目,您可以查看失败和正在运行的作业数。

深入到您的项目中,您将看到有关项目的更多详细信息。

单击特定作业将带您进入最近的运行。您可以在其中查看代码记录的参数、输入、工件、结果和任何输出。

摘要和后续步骤
在这篇文章中,我继续了我关于设置 MLRun 的上一篇文章。我展示了如何使用 MLRun 以最小的更改来托管现有的模型代码。但是,利用 MLRun 跟踪功能的最佳方法是让 MLRun 管理模型的训练。
虽然可以将现有代码移动到 MLRun,但这种技术并不能充分利用 MLRun 的自动跟踪功能。更好的方法是使用 MLRun 的“mlrun_torch.train()”函数。这允许 MLRun 全面管理训练 - 将记录工件、输入参数和指标。
如果你已经对 MLRun 走到了这一步,接下来可以考虑使用分布式训练和大型语言模型。