利用MinIO和TensorFlow进行超大规模机器学习
我们生活在一个由信息和人工智能定义的变革时代。每天都会生成并收集大量数据,以提供这些庞大的,最新的AI / ML算法。数据越多,结果越好。谷歌的TensorFlow
是已经成为主要行业标准的框架之一。用途广泛,可以使用Keras框架快速入门并编写简单的模型。如果您寻求更高级的方法,TensorFlow还允许您使用低级API构建自己的机器学习模型。不,你选择什么样的战略物质,TensorFlow将确保你的算法是否为你选择适合自己的算法任何基础设施的优化-无论是CPU的,GPU的或TPU的。
由于数据集变得太大而无法放入内存或本地磁盘,因此AI / ML管道现在需要从外部数据源加载数据。以ImageNet数据集为例,其14 Million Images估计存储大小为1.31TB。此数据集无法放入内存,也无法放在任何计算机本地存储驱动器上。如果您的管道在无状态环境(例如Kubernetes)(这已越来越普遍)中运行,这些挑战将变得更加复杂。
这个问题的新兴标准是在AI / ML管道的设计中采用高性能的对象存储。MinIO是该领域的领导者,并发布了许多基准测试,这些基准可以说明其吞吐能力。在本文中,我们将介绍如何在TensorFlow项目中利用MinIO。
四阶段超大规模数据管道
为了构建超大规模管道,我们将从MinIO读取管道的每个阶段。在此示例中,我们将构建机器学习管道的四个阶段。这种架构将按需从MinIO加载所需的数据。
首先,我们将预处理数据集并以TensorFlow可以快速消化的格式对其进行编码。这种格式是tf.TFRecord,它是数据的二进制编码类型。我们之所以采取这一步骤,是因为我们不想浪费培训时间来处理数据,因为我们计划直接在需要时从MinIO加载每批培训。如果在将数据输入模型训练之前对其进行了预处理,则可以节省大量时间。理想情况下,我们创建经过预处理的数据块,这些数据可以对大量的记录(至少100-200MB大小)进行分组。
为了加快数据加载和培训阶段,我们将利用出色的tf.data API。该API旨在在模型的训练/验证过程中有效地加载数据。当模型正在处理当前数据时,它将准备下一批数据。这种方法的优势在于,它可以确保有效利用昂贵的GPU或TPU,这些GPU或TPU由于加载数据缓慢而无法闲置。MinIO不会遇到此问题-它可以使用少量NVMe驱动器或硬盘驱动器使100Gbps网络饱和,以确保管道以硬件允许的最快速度处理数据。
在训练期间,我们要确保我们存储模型的训练检查点以及TensorBoard直方图。如果训练中断并且我们想继续训练或者我们想获取更多数据并希望继续使用新数据训练我们的模型,那么检查点很有用,TensorBoard直方图让我们看到了训练的进行情况。TensorFlow支持将这两个都直接写入MinIO。
快速附注。模型完成后,我们也将其保存到MinIO-允许我们使用TensorFlow Serving为其提供服务 -但这是另外一段时间。

建立管道
对于我们的超大规模管道,我们将使用可以轻松放入您的本地计算机的数据集,以便您继续学习。斯坦福的大型电影评论数据集很棒,因为它有大量样本(用于训练的25,000个用于测试的25,000个样本),因此我们将建立一个情绪分析模型,以告诉我们电影评论是positive还是negative。请记住,每个步骤都可以应用于任何更大的数据集。该数据集的优点是您可以在自己的计算机上尝试。让我们开始吧!
下载数据集并使用MinIO Client将其上传到MinIO
curl -O http://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz mc mb myminio/datasets mc cp aclImdb_v1.tar.gz myminio/datasets/
首先,为管道声明一些配置,例如batch size,数据集的位置和固定位置,random seed以便我们可以一次又一次地运行此管道并获得相同的结果。
random_seed = 44 batch_size = 128 datasets_bucket = 'datasets' preprocessed_data_folder = 'preprocessed-data' tf_record_file_size = 500 # Set the random seed tf.random.set_seed(random_seed) # How to access MinIO minio_address = 'localhost:9000' minio_access_key = 'minioadmin' minio_secret_key = 'minioadmin'
我们将使用MinIO从MinIO下载数据集 minio-py
minioClient = Minio(minio_address, access_key=minio_access_key, secret_key=minio_secret_key, secure=False) try: minioClient.fget_object( datasets_bucket, 'aclImdb_v1.tar.gz', '/tmp/dataset.tar.gz') except ResponseError as err: print(err)
现在让我们将数据集解压缩到一个临时文件夹(/tmp/dataset)中以预处理我们的数据
extract_folder = f'/tmp/{datasets_bucket}/'
with tarfile.open("/tmp/dataset.tar.gz", "r:gz") as tar:
tar.extractall(path=extract_folder)预处理
由于数据集的结构,我们会从四个文件夹阅读,最初test并train持有25,000每个实例,然后,在每个那些我们有文件夹的12,500每个标签的pos积极评价和neg对负面评论。从这四个文件夹,我们将所有样本存入两个变量,train和test。如果我们要预处理一个无法容纳在本地计算机中的数据集,则可以简单地一次加载对象的各个部分并对其进行处理。
train = []
test = []
dirs_to_read = [
'aclImdb/train/pos',
'aclImdb/train/neg',
'aclImdb/test/pos',
'aclImdb/test/neg',
]
for dir_name in dirs_to_read:
parts = dir_name.split("/")
dataset = parts[1]
label = parts[2]
for filename in os.listdir(os.path.join(extract_folder,dir_name)):
with open(os.path.join(extract_folder,dir_name,filename),'r') as f:
content = f.read()
if dataset == "train":
train.append({
"text":content,
"label":label
})
elif dataset == "test":
test.append({
"text":content,
"label":label
})然后,我们将对数据集进行混洗,以免通过提供12,500个连续的正例,然后再提供12,500个连续的负例,在训练中不引入偏见。我们的模型很难推广。通过对数据进行混洗,模型将可以同时从正面和负面示例中查看和学习。
random.Random(random_seed).shuffle(train) random.Random(random_seed).shuffle(test)
由于我们正在处理文本,因此需要将文本转换为矢量表示形式,以准确地描述句子的含义。如果要处理图像,我们将调整图像的大小并将其转换为矢量表示,每个像素均为调整后图像的值。
但是,对于文本而言,我们面临更大的挑战,因为单词实际上没有数字表示形式。这是嵌入有用的地方。嵌入是某些文本的矢量表示,在这种情况下,我们将整个评论表示为512维的单个矢量。我们将利用现有的称为USE(通用语句编码器)的模型将句子编码为向量,而不是手动进行文本的预处理(加标记,建立词汇表和训练嵌入层),以便继续进行示例。这是深度学习的奇迹之一,它具有与您的模型一起重用不同模型的能力。在这里,我们使用TensorFlow Hub,我们将加载最新USE模型。
import tensorflow_hub as hub
embed = hub.load("https://tfhub.dev/google/universal-sentence-encoder-large/5")由于创建25,000句子的嵌入并将其保留在内存中太多了,因此我们将数据集切成的块500。
要将数据存储到中,TFRecord我们需要将要素编码为tf.train.Feature。我们将数据标签存储为列表,将tf.int64电影评论存储为浮点列表,因为在使用编码句子之后,USE我们将最终嵌入512维数
def _embedded_sentence_feature(value):
return tf.train.Feature(float_list=tf.train.FloatList(value=value))
def _label_feature(value):
return tf.train.Feature(int64_list=tf.train.Int64List(value=value))
def encode_label(label):
if label == "pos":
return tf.constant([1,0])
elif label == "neg":
return tf.constant([0,1])
# This will take the label and the embedded sentence and encode it as a tf.TFRecord
def serialize_example(label, sentence_tensor):
feature = {
'sentence': _embedded_sentence_feature(sentence_tensor[0]),
'label': _label_feature(label),
}
example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
return example_proto
def process_examples(records,prefix=""):
starttime = timeit.default_timer()
total_training = len(records)
print(f"Total of {total_training} elements")
total_batches = math.floor(total_training / tf_record_file_size)
if total_training % tf_record_file_size != 0:
total_batches += 1
print(f"Total of {total_batches} files of {tf_record_file_size} records")
counter = 0
file_counter = 0
buffer = []
file_list = []
for i in range(len(records)):
counter += 1
sentence_embedding = embed([records[i]['text']])
label_encoded = encode_label(records[i]['label'])
record = serialize_example(label_encoded, sentence_embedding)
buffer.append(record)
if counter >= tf_record_file_size:
print(f"Records in buffer {len(buffer)}")
# save this buffer of examples as a file to MinIO
counter = 0
file_counter+=1
file_name = f"{prefix}_file{file_counter}.tfrecord"
with open(file_name,'w+') as f:
with tf.io.TFRecordWriter(f.name,options="GZIP") as writer:
for example in buffer:
writer.write(example.SerializeToString())
try:
minioClient.fput_object(datasets_bucket, f"{preprocessed_data_folder}/{file_name}", file_name)
except ResponseError as err:
print(err)
file_list.append(file_name)
os.remove(file_name)
buffer=[]
print(f"Done with chunk {file_counter}/{total_batches} - {timeit.default_timer() - starttime}")
if len(buffer) > 0:
file_counter+=1
file_name = f"file{file_counter}.tfrecord"
with open(file_name,'w+') as f:
with tf.io.TFRecordWriter(f.name) as writer:
for example in buffer:
writer.write(example.SerializeToString())
try:
minioClient.fput_object(datasets_bucket, f"{preprocessed_data_folder}/{file_name}", file_name)
except ResponseError as err:
print(err)
file_list.append(file_name)
os.remove(file_name)
buffer=[]
print("Total time preprocessing is :", timeit.default_timer() - starttime)
return file_list
process_examples(train,prefix="train")
process_examples(test,prefix="test")
print("Done Preprocessing data!")至此,我们已经完成了对数据的预处理。我们在.tfrecord存储桶中存储了一组文件。现在,我们将其提供给模型,使其可以同时使用和训练。
训练
我们将从MinIO获取文件列表(训练数据)。从技术上讲,预处理阶段和培训阶段可以完全分离,因此最好列出存储桶中的文件块。
# List all training tfrecord files
objects = minioClient.list_objects_v2(datasets_bucket, prefix=f"{preprocessed_data_folder}/train")
training_files_list = []
for obj in objects:
training_files_list.append(obj.object_name)
# List all testing tfrecord files
objects = minioClient.list_objects_v2(datasets_bucket, prefix=f"{preprocessed_data_folder}/test")
testing_files_list = []
for obj in objects:
testing_files_list.append(obj.object_name)为了TensorFlow连接到MinIO,我们将告诉它MinIO实例的位置和连接详细信息。
os.environ['AWS_ACCESS_KEY_ID'] = minio_access_key os.environ['AWS_SECRET_ACCESS_KEY'] = minio_secret_key os.environ['AWS_REGION'] = "us-east-1" os.environ['S3_ENDPOINT'] = minio_address os.environ['S3_USE_HTTPS'] = "0" os.environ['S3_VERIFY_SSL'] = "0"
现在,让我们创建一个tf.data.Dataset,在需要时从MinIO上的文件加载记录。为此,我们将获取我们拥有的文件列表,并以引用实际对象位置的方式对其进行格式化。我们还将对测试数据集执行此操作。
all_training_filenames = [f"s3://datasets/{f}" for f in training_files_list]
testing_filenames = [f"s3://datasets/{f}" for f in testing_files_list]以下步骤是可选的,但我建议您这样做。我将训练数据集分为两组,90%用于训练10%的数据和用于验证的数据,该模型不会从验证数据中学习,但是它将帮助模型更好地训练。
total_train_data_files = math.floor(len(all_training_filenames)*0.9) if total_train_data_files == len(all_training_filenames): total_train_data_files -= 1 training_files = all_training_filenames[0:total_train_data_files] validation_files = all_training_filenames[total_train_data_files:]
现在让我们创建tf.data数据集:
AUTO = tf.data.experimental.AUTOTUNE ignore_order = tf.data.Options() ignore_order.experimental_deterministic = False dataset = tf.data.TFRecordDataset(training_files,num_parallel_reads=AUTO,compression_type="GZIP") dataset = dataset.with_options(ignore_order) validation = tf.data.TFRecordDataset(validation_files,num_parallel_reads=AUTO,compression_type="GZIP") validation = validation.with_options(ignore_order) testing_dataset = tf.data.TFRecordDataset(testing_filenames,num_parallel_reads=AUTO,compression_type="GZIP") testing_dataset = testing_dataset.with_options(ignore_order)
为了解码我们的TFRecord编码文件,我们需要一个与我们的serialize_example功能完全相反的解码功能。由于来自的数据分别TFRecord具有(512,)和的形状(2,),因此我们也将对其进行重塑,因为这是我们的模型希望接收的格式。
def decode_fn(record_bytes):
schema = {
"label": tf.io.FixedLenFeature([2], dtype=tf.int64),
"sentence": tf.io.FixedLenFeature([512], dtype=tf.float32),
}
tf_example = tf.io.parse_single_example(record_bytes,schema)
new_shape = tf.reshape(tf_example['sentence'],[1,512])
label = tf.reshape(tf_example['label'],[1,2])
return new_shape,label让我们建立模型,一点也不花哨,我将使用最后带有softmax激活的几个密集层。我们正在试图预测输入是否positive还是negative那么我们将获得每个可能性的概率。
model = keras.Sequential() model.add( keras.layers.Dense( units=256, input_shape=(1,512 ), activation='relu' ) ) model.add( keras.layers.Dropout(rate=0.5) ) model.add( keras.layers.Dense( units=16, activation='relu' ) ) model.add( keras.layers.Dropout(rate=0.5) ) model.add(keras.layers.Dense(2, activation='softmax')) model.compile( loss='categorical_crossentropy', optimizer=keras.optimizers.Adam(0.001), metrics=['accuracy'] )

让我们为训练阶段准备数据集,让它们一次重复一点并批量128处理
mapped_ds = dataset.map(decode_fn) mapped_ds = mapped_ds.repeat(5) mapped_ds = mapped_ds.batch(128) mapped_validation = validation.map(decode_fn) mapped_validation = mapped_validation.repeat(5) mapped_validation = mapped_validation.batch(128) testing_mapped_ds = testing_dataset.map(decode_fn) testing_mapped_ds = testing_mapped_ds.repeat(5) testing_mapped_ds = testing_mapped_ds.batch(128)
进行训练时,我们希望存储模型的检查点,以防训练被中断并且我们想从上次中断的地方继续。为此,我们将使用keras回调tf.keras.callbacks.ModelCheckpoint使TensorFlowMinIO在每个纪元后将检查点保存到。
checkpoint_path = f"s3://{datasets_bucket}/checkpoints/cp.ckpt"
cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
save_weights_only=True,
verbose=1) save_weights_only=True,
verbose=1)我们还希望保存TensorBoard 直方图,因此我们将添加一个回调以将其存储在logs/imdb/前缀下的存储桶中。我们用model_note当前时间和当前时间来标识这次跑步,因此我们可以区分出不同的训练实例。
model_note="256-input"
logdir = f"s3://{datasets_bucket}/logs/imdb/{model_note}-" + datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=logdir)最后,我们将训练模型:
history = model.fit( mapped_ds, epochs=10, callbacks=[cp_callback, tensorboard_callback], validation_data=mapped_validation )
如果运行,mc admin trace myminio则可以TensorFlow直接从MinIO读取数据,但仅需要读取以下部分:

现在我们有了模型,我们想将其保存到MinIO:
model.save(f"s3://{datasets_bucket}/imdb_sentiment_analysis")让我们测试一下模型,看看它如何执行:
testing = model.evaluate(testing_mapped_ds)
这样返回的准确度为85.63%,不是最先进的,但对于这样一个简单的示例也不错。
让我们TensorBoard开始探索我们的模型,这些模型直接从MinIO加载数据
AWS_ACCESS_KEY_ID=minioadmin AWS_SECRET_ACCESS_KEY=minioadmin AWS_REGION=us-east-1 S3_ENDPOINT=localhost:9000 S3_USE_HTTPS=0 S3_VERIFY_SSL=0 tensorboard --logdir s3://datasets/logs
然后转到http://localhost:6006浏览器

我们可以使用我们的模型,看看它是否有效
samples = [
"This movie sucks",
"This was extremely good, I loved it.",
"great acting",
"terrible acting",
"pure kahoot",
"This is not a good movie",
]
sample_embedded = embed(samples)
res = model.predict(sample_embedded)
for s in range(len(samples)):
if res[s][0] > res[s][1]:
print(f"{samples[s]} - positive")
else:
print(f"{samples[s]} - negative")这将返回以下输出
This movie sucks - negative This was extremely good, I loved it. - positive great acting - positive terrible acting - negative pure kahoot - positive This is not a good movie - negative
结论
如所演示的,您可以构建完全依赖MinIO的大规模AI / ML管道。这既取决于MinIO的性能特征,又取决于其无缝扩展至PB和Exabytes数据的能力。通过分离存储和计算,可以构建一个不依赖本地资源的框架-允许您在Kubernetes内的容器上运行它们。这增加了相当大的灵活性。
您会看到TensorFlow如何能够按需加载数据,而根本不需要自定义,这很简单。而且,可以通过以分布式方式运行TensorFlow将该方法快速扩展到培训。当MinIO成为该数据的唯一来源时,这可以确保在训练节点之间的网络上几乎没有数据可以洗牌。
这篇文章的代码可以在Github上找到:https : //github.com/dvaldivia/hyper-scale-tensorflow-with-minio