
tf.distribuet 101: обучение керас на нескольких устройствах и машинах
14 июня 2025 г.Обзор контента
- Введение
- Настраивать
- Одно-хост-синхронная тренировка
- Использование обратных вызовов для обеспечения устойчивости к неисправности
- TF.Data Tips
- Несколько работников распределенной синхронной тренировки
- Пример: код, работающий в многоработной настройке
- Дальнейшее чтение
Введение
Как правило, есть два способа распределения вычислений на нескольких устройствах:
Параллелизм данных, где одна модель реплицируется на нескольких устройствах или нескольких машинах. Каждый из них обрабатывает разные партии данных, затем объединяет свои результаты. Существует много вариантов этой настройки, которые отличаются от того, как результаты слияния различных реплик модели, в том, остаются ли они синхронизировать на каждой партии или они более слабо связаны и т. Д.
Модель параллелизма, где разные части одной модели работают на разных устройствах, обрабатывают одну партию данных вместе. Это лучше всего работает с моделями, которые имеют естественно параллельную архитектуру, такую как модели, которые оснащены несколькими ветвями.
Это руководство фокусируется на параллелизме данных, в частностиСинхронные данные параллелизма, где различные копии модели остаются в синхронизации после каждой партии, которую они обрабатывают. Синхронность сохраняет поведение сходимости модели, идентичное тому, что вы увидите для обучения на одном устройстве.
В частности, это руководство учит вас, как использоватьtf.distribute
API для обучения моделей Keras на несколько графических процессоров, с минимальными изменениями вашего кода, в следующих двух настройках:
- На нескольких графических процессорах (обычно от 2 до 8), установленных на одной машине (один хост, многократное обучение). Это самая распространенная установка для исследователей и мелкомасштабных рабочих процессов в отрасли.
- На кластере многих машин каждый разнимает один или несколько графических процессоров (многоработника распределенного обучения). Это хорошая установка для крупномасштабных рабочих процессов отрасли, например, Обучение моделей классификации изображений с высоким разрешением на десятках миллионов изображений с использованием 20-100 графических процессоров.
Настраивать
import tensorflow as tf
import keras
Одно-хост, многолетняя синхронная тренировка
В этой настройке у вас есть одна машина с несколькими графическими процессорами (обычно от 2 до 8). Каждое устройство будет запускать копию вашей модели (называетсяреплика) Для простоты, в дальнейшем, мы предположим, что имеем дело с 8 графическими процессорами, без каких -либо потерь общности.
Как это работает
На каждом этапе обучения:
- Текущая партия данных (вызваннаяГлобальная партия) разделен на 8 различных подборов (называетсяместные партии) Например, если глобальная партия имеет 512 образцов, каждая из 8 локальных партий будет иметь 64 образца.
- Каждая из 8 реплик независимо обрабатывает локальную партию: они запускают вперед проход, затем обратный проход, выводя градиент веса потери модели на локальной партии.
- Обновления веса, происходящие из локальных градиентов, эффективно объединены по 8 репликах. Поскольку это делается в конце каждого шага, реплики всегда остаются в синхронизации.
На практике процесс синхронного обновления весов модели реплики обрабатывается на уровне каждой отдельной переменной веса. Это делается череззеркальная переменнаяобъект.
Как его использовать
Для проведения однонационального синхронного обучения с моделью с моделью кераса вы бы использовалиtf.distribute.MirroredStrategy
API. Вот как это работает:
- Создавать экземпляры а
MirroredStrategy
, опционально настройка, какие конкретные устройства вы хотите использовать (по умолчанию стратегия будет использовать все доступные графические процессоры). - Используйте объект стратегии, чтобы открыть область применения, и в рамках этой области создайте все необходимые вам объекты кераса, которые содержат переменные. Как правило, это означаетСоздание и составление моделиВнутри прицела распределения.
- Тренировать модель через
fit()
по-прежнему.
Важно отметить, что мы рекомендуем вам использоватьtf.data.Dataset
Объекты для загрузки данных в многолетнем или распределенном рабочем процессе.
Схематично, похоже, что это:
# Create a MirroredStrategy.
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))
# Open a strategy scope.
with strategy.scope():
# Everything that creates variables should be under the strategy scope.
# In general this is only model construction & `compile()`.
model = Model(...)
model.compile(...)
# Train the model on all available devices.
model.fit(train_dataset, validation_data=val_dataset, ...)
# Test the model on all available devices.
model.evaluate(test_dataset)
Вот простой сквозной пример:
def get_compiled_model():
# Make a simple 2-layer densely-connected neural network.
inputs = keras.Input(shape=(784,))
x = keras.layers.Dense(256, activation="relu")(inputs)
x = keras.layers.Dense(256, activation="relu")(x)
outputs = keras.layers.Dense(10)(x)
model = keras.Model(inputs, outputs)
model.compile(
optimizer=keras.optimizers.Adam(),
loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[keras.metrics.SparseCategoricalAccuracy()],
)
return model
def get_dataset():
batch_size = 32
num_val_samples = 10000
# Return the MNIST dataset in the form of a `tf.data.Dataset`.
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
# Preprocess the data (these are Numpy arrays)
x_train = x_train.reshape(-1, 784).astype("float32") / 255
x_test = x_test.reshape(-1, 784).astype("float32") / 255
y_train = y_train.astype("float32")
y_test = y_test.astype("float32")
# Reserve num_val_samples samples for validation
x_val = x_train[-num_val_samples:]
y_val = y_train[-num_val_samples:]
x_train = x_train[:-num_val_samples]
y_train = y_train[:-num_val_samples]
return (
tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(batch_size),
tf.data.Dataset.from_tensor_slices((x_val, y_val)).batch(batch_size),
tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(batch_size),
)
# Create a MirroredStrategy.
strategy = tf.distribute.MirroredStrategy()
print("Number of devices: {}".format(strategy.num_replicas_in_sync))
# Open a strategy scope.
with strategy.scope():
# Everything that creates variables should be under the strategy scope.
# In general this is only model construction & `compile()`.
model = get_compiled_model()
# Train the model on all available devices.
train_dataset, val_dataset, test_dataset = get_dataset()
model.fit(train_dataset, epochs=2, validation_data=val_dataset)
# Test the model on all available devices.
model.evaluate(test_dataset)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')
Number of devices: 4
2023-07-19 11:35:32.379801: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:786] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
key: "Toutput_types"
value {
list {
type: DT_FLOAT
type: DT_FLOAT
}
}
}
attr {
key: "_cardinality"
value {
i: 50000
}
}
attr {
key: "is_files"
value {
b: false
}
}
attr {
key: "metadata"
value {
s: "\n\024TensorSliceDataset:0"
}
}
attr {
key: "output_shapes"
value {
list {
shape {
dim {
size: 784
}
}
shape {
}
}
}
}
attr {
key: "replicate_on_split"
value {
b: false
}
}
experimental_type {
type_id: TFT_PRODUCT
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
}
}
}
Epoch 1/2
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
1556/1563 [============================>.] - ETA: 0s - loss: 0.2236 - sparse_categorical_accuracy: 0.9328INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
2023-07-19 11:35:46.769935: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:786] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
key: "Toutput_types"
value {
list {
type: DT_FLOAT
type: DT_FLOAT
}
}
}
attr {
key: "_cardinality"
value {
i: 10000
}
}
attr {
key: "is_files"
value {
b: false
}
}
attr {
key: "metadata"
value {
s: "\n\024TensorSliceDataset:2"
}
}
attr {
key: "output_shapes"
value {
list {
shape {
dim {
size: 784
}
}
shape {
}
}
}
}
attr {
key: "replicate_on_split"
value {
b: false
}
}
experimental_type {
type_id: TFT_PRODUCT
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
}
}
}
1563/1563 [==============================] - 16s 7ms/step - loss: 0.2238 - sparse_categorical_accuracy: 0.9328 - val_loss: 0.1347 - val_sparse_categorical_accuracy: 0.9592
Epoch 2/2
1563/1563 [==============================] - 11s 7ms/step - loss: 0.0940 - sparse_categorical_accuracy: 0.9717 - val_loss: 0.0984 - val_sparse_categorical_accuracy: 0.9684
2023-07-19 11:35:59.993148: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:786] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
key: "Toutput_types"
value {
list {
type: DT_FLOAT
type: DT_FLOAT
}
}
}
attr {
key: "_cardinality"
value {
i: 10000
}
}
attr {
key: "is_files"
value {
b: false
}
}
attr {
key: "metadata"
value {
s: "\n\024TensorSliceDataset:4"
}
}
attr {
key: "output_shapes"
value {
list {
shape {
dim {
size: 784
}
}
shape {
}
}
}
}
attr {
key: "replicate_on_split"
value {
b: false
}
}
experimental_type {
type_id: TFT_PRODUCT
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
}
}
}
313/313 [==============================] - 2s 4ms/step - loss: 0.1057 - sparse_categorical_accuracy: 0.9676
[0.10571097582578659, 0.9675999879837036]
Использование обратных вызовов для обеспечения устойчивости к неисправности
При использовании распределенного обучения вы всегда должны убедиться, что у вас есть стратегия восстановления после сбоя (устойчивость к ошибкам). Самый простой способ справиться с этим - пройтиModelCheckpoint
обратный вызовfit()
, чтобы сохранить свою модель через регулярные промежутки времени (например, каждые 100 партий или каждую эпоху). Затем вы можете перезапустить обучение из своей сохраненной модели.
Вот простой пример:
import os
from tensorflow import keras
# Prepare a directory to store all the checkpoints.
checkpoint_dir = "./ckpt"
if not os.path.exists(checkpoint_dir):
os.makedirs(checkpoint_dir)
def make_or_restore_model():
# Either restore the latest model, or create a fresh one
# if there is no checkpoint available.
checkpoints = [checkpoint_dir + "/" + name for name in os.listdir(checkpoint_dir)]
if checkpoints:
latest_checkpoint = max(checkpoints, key=os.path.getctime)
print("Restoring from", latest_checkpoint)
return keras.models.load_model(latest_checkpoint)
print("Creating a new model")
return get_compiled_model()
def run_training(epochs=1):
# Create a MirroredStrategy.
strategy = tf.distribute.MirroredStrategy()
# Open a strategy scope and create/restore the model
with strategy.scope():
model = make_or_restore_model()
callbacks = [
# This callback saves a SavedModel every epoch
# We include the current epoch in the folder name.
keras.callbacks.ModelCheckpoint(
filepath=checkpoint_dir + "/ckpt-{epoch}", save_freq="epoch"
)
]
model.fit(
train_dataset,
epochs=epochs,
callbacks=callbacks,
validation_data=val_dataset,
verbose=2,
)
# Running the first time creates the model
run_training(epochs=1)
# Calling the same function again will resume from where we left off
run_training(epochs=1)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')
Creating a new model
2023-07-19 11:36:01.811216: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:786] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
key: "Toutput_types"
value {
list {
type: DT_FLOAT
type: DT_FLOAT
}
}
}
attr {
key: "_cardinality"
value {
i: 50000
}
}
attr {
key: "is_files"
value {
b: false
}
}
attr {
key: "metadata"
value {
s: "\n\024TensorSliceDataset:0"
}
}
attr {
key: "output_shapes"
value {
list {
shape {
dim {
size: 784
}
}
shape {
}
}
}
}
attr {
key: "replicate_on_split"
value {
b: false
}
}
experimental_type {
type_id: TFT_PRODUCT
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
}
}
}
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1
2023-07-19 11:36:13.671835: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:786] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
key: "Toutput_types"
value {
list {
type: DT_FLOAT
type: DT_FLOAT
}
}
}
attr {
key: "_cardinality"
value {
i: 10000
}
}
attr {
key: "is_files"
value {
b: false
}
}
attr {
key: "metadata"
value {
s: "\n\024TensorSliceDataset:2"
}
}
attr {
key: "output_shapes"
value {
list {
shape {
dim {
size: 784
}
}
shape {
}
}
}
}
attr {
key: "replicate_on_split"
value {
b: false
}
}
experimental_type {
type_id: TFT_PRODUCT
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
}
}
}
INFO:tensorflow:Assets written to: ./ckpt/ckpt-1/assets
INFO:tensorflow:Assets written to: ./ckpt/ckpt-1/assets
1563/1563 - 14s - loss: 0.2268 - sparse_categorical_accuracy: 0.9322 - val_loss: 0.1148 - val_sparse_categorical_accuracy: 0.9656 - 14s/epoch - 9ms/step
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')
Restoring from ./ckpt/ckpt-1
2023-07-19 11:36:16.521031: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:786] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
key: "Toutput_types"
value {
list {
type: DT_FLOAT
type: DT_FLOAT
}
}
}
attr {
key: "_cardinality"
value {
i: 50000
}
}
attr {
key: "is_files"
value {
b: false
}
}
attr {
key: "metadata"
value {
s: "\n\024TensorSliceDataset:0"
}
}
attr {
key: "output_shapes"
value {
list {
shape {
dim {
size: 784
}
}
shape {
}
}
}
}
attr {
key: "replicate_on_split"
value {
b: false
}
}
experimental_type {
type_id: TFT_PRODUCT
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
}
}
}
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1
2023-07-19 11:36:28.440092: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:786] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
key: "Toutput_types"
value {
list {
type: DT_FLOAT
type: DT_FLOAT
}
}
}
attr {
key: "_cardinality"
value {
i: 10000
}
}
attr {
key: "is_files"
value {
b: false
}
}
attr {
key: "metadata"
value {
s: "\n\024TensorSliceDataset:2"
}
}
attr {
key: "output_shapes"
value {
list {
shape {
dim {
size: 784
}
}
shape {
}
}
}
}
attr {
key: "replicate_on_split"
value {
b: false
}
}
experimental_type {
type_id: TFT_PRODUCT
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
}
}
}
INFO:tensorflow:Assets written to: ./ckpt/ckpt-1/assets
INFO:tensorflow:Assets written to: ./ckpt/ckpt-1/assets
1563/1563 - 13s - loss: 0.0974 - sparse_categorical_accuracy: 0.9703 - val_loss: 0.0960 - val_sparse_categorical_accuracy: 0.9724 - 13s/epoch - 9ms/step
tf.data
Советы по производительности
При распределенном обучении эффективность, с которой вы загружаете данные, часто может стать критической. Вот несколько советов, чтобы убедиться, что вашtf.data
Трубопроводы работают как можно быстрее.
ПРИМЕЧАНИЕ О БОРЕЗАНИИ
При создании вашего набора данных убедитесь, что он сочетается с глобальным размером партии. Например, если каждый из ваших 8 графических процессоров способен запускать партию из 64 образцов, вы вызовыте использовать глобальный размер партии 512.
Вызовdataset.cache()
Если вы позвоните.cache()
В наборе данных его данные будут кэшированы после прохождения первой итерации по данным. Каждая последующая итерация будет использовать кэшированные данные. Кэш может быть в памяти (по умолчанию) или в локальном файле, который вы указываете.
Это может повысить производительность, когда:
- Ваши данные не ожидаются от итерации на итерацию
- Вы читаете данные из удаленной распределенной файловой системы
- Вы читаете данные с локального диска, но ваши данные будут соответствовать памяти, а ваш рабочий процесс значительно связан с IO (например, чтение и декодирование файлов изображений).
Вызовdataset.prefetch(buffer_size)
Вы почти всегда должны звонить.prefetch(buffer_size)
После создания набора данных. Это означает, что ваш трубопровод данных будет работать асинхронно из вашей модели, при этом новые образцы предварительно обработаны и хранятся в буфере, в то время как текущие пакетные образцы используются для обучения модели. Следующая партия будет предварительно оформлена в памяти GPU к тому времени, когда текущая партия закончилась.
Несколько работников распределенной синхронной тренировки
Как это работает
В этой настройке у вас есть несколько машин (называетсяработники), каждый с одним или несколькими графическими процессорами на них. Подобно тому, что происходит для тренировок с одним хостом, каждый доступный графический процессор будет запускать одну модельную копию, а значение переменных каждой копии сохраняется синхронизированным после каждой партии.
Важно отметить, что текущая реализация предполагает, что все работники имеют одинаковое количество графических процессоров (гомогенный кластер).
Как его использовать
- Установите кластер (мы предоставляем указатели ниже).
- Настройка подходящего
TF_CONFIG
переменная среды на каждом работнике. Это говорит работнику, какова его роль и как общаться со своими сверстниками. - На каждом работнике запустите код конструкции и компиляции модели в рамках
MultiWorkerMirroredStrategy
Объект, аналогично мы проделали для обучения на одном хосте. - Запустите код оценки на назначенной машине оценщика.
Настройка кластера
Во -первых, установите кластер (коллектив машин). Каждая машина индивидуально должна быть настроена, чтобы иметь возможность запустить вашу модель (как правило, каждая машина будет запускать одно и то же изображение Docker) и иметь возможность получить доступ к вашему источнику данных (например, GCS).
Управление кластером выходит за рамки этого руководства.Вот документЧтобы помочь вам начать. Вы также можете взглянуть наKubeflowПолем
НастройкаTF_CONFIG
переменная среды
В то время как код, работающий на каждом работнике, почти такой же, как и код, используемый в рабочем процессе с одним хостом (за исключением другогоtf.distribute
объект стратегии), одно существенное различие между рабочим процессом с одним хостом и рабочим процессом с несколькими работниками заключается в том, что вам необходимо установитьTF_CONFIG
Переменная среда на каждой машине, работающей в вашем кластере.
АTF_CONFIG
Переменная среда - это строка JSON, которая указывает:
- Конфигурация кластера, в то время как список адресов и портов машин, которые составляют кластер
- «Задача» работника, которая является той ролью, которую эта конкретная машина должна играть в кластере.
Одним из примеров TF_CONFIG является:
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["localhost:12345", "localhost:23456"]
},
'task': {'type': 'worker', 'index': 0}
})
В многоработником синхронном обучении настройки достоверных ролей (типы задач) для машин являются «работник» и «оценщик».
Например, если у вас есть 8 машин с 4 графическими процессорами, у вас может быть 7 работников и один оценщик.
- Рабочие тренируют модель, каждая из которых обрабатывает подборы глобальной партии.
- Один из работников (работник 0) будет служить «главным», конкретным видом работника, который отвечает за сохранение журналов и контрольно -пропускных пунктов для последующего повторного использования (как правило, в облачном хранилище).
- Оценщик запускает непрерывный цикл, который загружает последнюю контрольную точку, сохраненную главным работником, проводит оценку на нем (асинхронно от других работников) и записывает журналы оценки (например, журналы Tensorboard).
Запуск кода на каждом работнике
Вы будете запускать код обучения на каждом работнике (включая начальника) и код оценки на оценщике.
Код обучения в основном такой же, как и то, что вы бы использовали в настройке с одним хостом, за исключением использованияMultiWorkerMirroredStrategy
вместоMirroredStrategy
Полем
Каждый работник будет запускать один и тот же код (за исключением разницы, объясненной в примечании ниже), включая одни и те же обратные вызовы.
Примечание:Обратные вызовы, которые сохраняют контрольные точки или журналы модели, должны сохранять в другом каталоге для каждого работника. Стандартная практика - все работники должны сэкономить на местном диске (что обычно временно),кроме работника 0, что сохранило бы контрольные точки журнала Tensorboard в облачное место для хранения для последующего доступа и повторного использования.
Оценщик просто использовал быMirroredStrategy
(Поскольку он работает на одной машине и не нуждается в связи с другими машинами) и вызовmodel.evaluate()
Полем Он будет загружать последнюю контрольную точку, сохранившуюся главным работником в местоположение облачного хранения, и сохранит журналы оценки в то же место, что и главные журналы.
Пример: код, работающий в многоработной настройке
На начальнике (работник 0):
# Set TF_CONFIG
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["localhost:12345", "localhost:23456"]
},
'task': {'type': 'worker', 'index': 0}
})
# Open a strategy scope and create/restore the model.
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
with strategy.scope():
model = make_or_restore_model()
callbacks = [
# This callback saves a SavedModel every 100 batches
keras.callbacks.ModelCheckpoint(filepath='path/to/cloud/location/ckpt',
save_freq=100),
keras.callbacks.TensorBoard('path/to/cloud/location/tb/')
]
model.fit(train_dataset,
callbacks=callbacks,
...)
На других работниках:
# Set TF_CONFIG
worker_index = 1 # For instance
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["localhost:12345", "localhost:23456"]
},
'task': {'type': 'worker', 'index': worker_index}
})
# Open a strategy scope and create/restore the model.
# You can restore from the checkpoint saved by the chief.
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
with strategy.scope():
model = make_or_restore_model()
callbacks = [
keras.callbacks.ModelCheckpoint(filepath='local/path/ckpt', save_freq=100),
keras.callbacks.TensorBoard('local/path/tb/')
]
model.fit(train_dataset,
callbacks=callbacks,
...)
На оценке:
strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
model = make_or_restore_model() # Restore from the checkpoint saved by the chief.
results = model.evaluate(val_dataset)
# Then, log the results on a shared location, write TensorBoard logs, etc
Первоначально опубликовано на
Оригинал