tf.distribuet 101: обучение керас на нескольких устройствах и машинах

tf.distribuet 101: обучение керас на нескольких устройствах и машинах

14 июня 2025 г.

Обзор контента

  • Введение
  • Настраивать
  • Одно-хост-синхронная тренировка
  • Использование обратных вызовов для обеспечения устойчивости к неисправности
  • TF.Data Tips
  • Несколько работников распределенной синхронной тренировки
  • Пример: код, работающий в многоработной настройке
  • Дальнейшее чтение

Введение

Как правило, есть два способа распределения вычислений на нескольких устройствах:

Параллелизм данных, где одна модель реплицируется на нескольких устройствах или нескольких машинах. Каждый из них обрабатывает разные партии данных, затем объединяет свои результаты. Существует много вариантов этой настройки, которые отличаются от того, как результаты слияния различных реплик модели, в том, остаются ли они синхронизировать на каждой партии или они более слабо связаны и т. Д.

Модель параллелизма, где разные части одной модели работают на разных устройствах, обрабатывают одну партию данных вместе. Это лучше всего работает с моделями, которые имеют естественно параллельную архитектуру, такую ​​как модели, которые оснащены несколькими ветвями.

Это руководство фокусируется на параллелизме данных, в частностиСинхронные данные параллелизма, где различные копии модели остаются в синхронизации после каждой партии, которую они обрабатывают. Синхронность сохраняет поведение сходимости модели, идентичное тому, что вы увидите для обучения на одном устройстве.

В частности, это руководство учит вас, как использоватьtf.distributeAPI для обучения моделей Keras на несколько графических процессоров, с минимальными изменениями вашего кода, в следующих двух настройках:

  • На нескольких графических процессорах (обычно от 2 до 8), установленных на одной машине (один хост, многократное обучение). Это самая распространенная установка для исследователей и мелкомасштабных рабочих процессов в отрасли.
  • На кластере многих машин каждый разнимает один или несколько графических процессоров (многоработника распределенного обучения). Это хорошая установка для крупномасштабных рабочих процессов отрасли, например, Обучение моделей классификации изображений с высоким разрешением на десятках миллионов изображений с использованием 20-100 графических процессоров.

Настраивать


import tensorflow as tf
import keras

Одно-хост, многолетняя синхронная тренировка

В этой настройке у вас есть одна машина с несколькими графическими процессорами (обычно от 2 до 8). Каждое устройство будет запускать копию вашей модели (называетсяреплика) Для простоты, в дальнейшем, мы предположим, что имеем дело с 8 графическими процессорами, без каких -либо потерь общности.

Как это работает

На каждом этапе обучения:

  • Текущая партия данных (вызваннаяГлобальная партия) разделен на 8 различных подборов (называетсяместные партии) Например, если глобальная партия имеет 512 образцов, каждая из 8 локальных партий будет иметь 64 образца.
  • Каждая из 8 реплик независимо обрабатывает локальную партию: они запускают вперед проход, затем обратный проход, выводя градиент веса потери модели на локальной партии.
  • Обновления веса, происходящие из локальных градиентов, эффективно объединены по 8 репликах. Поскольку это делается в конце каждого шага, реплики всегда остаются в синхронизации.

На практике процесс синхронного обновления весов модели реплики обрабатывается на уровне каждой отдельной переменной веса. Это делается череззеркальная переменнаяобъект.

Как его использовать

Для проведения однонационального синхронного обучения с моделью с моделью кераса вы бы использовалиtf.distribute.MirroredStrategyAPI. Вот как это работает:

  • Создавать экземпляры аMirroredStrategy, опционально настройка, какие конкретные устройства вы хотите использовать (по умолчанию стратегия будет использовать все доступные графические процессоры).
  • Используйте объект стратегии, чтобы открыть область применения, и в рамках этой области создайте все необходимые вам объекты кераса, которые содержат переменные. Как правило, это означаетСоздание и составление моделиВнутри прицела распределения.
  • Тренировать модель черезfit()по-прежнему.

Важно отметить, что мы рекомендуем вам использоватьtf.data.DatasetОбъекты для загрузки данных в многолетнем или распределенном рабочем процессе.

Схематично, похоже, что это:


# Create a MirroredStrategy.
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))

# Open a strategy scope.
with strategy.scope():
  # Everything that creates variables should be under the strategy scope.
  # In general this is only model construction & `compile()`.
  model = Model(...)
  model.compile(...)

# Train the model on all available devices.
model.fit(train_dataset, validation_data=val_dataset, ...)

# Test the model on all available devices.
model.evaluate(test_dataset)

Вот простой сквозной пример:


def get_compiled_model():
    # Make a simple 2-layer densely-connected neural network.
    inputs = keras.Input(shape=(784,))
    x = keras.layers.Dense(256, activation="relu")(inputs)
    x = keras.layers.Dense(256, activation="relu")(x)
    outputs = keras.layers.Dense(10)(x)
    model = keras.Model(inputs, outputs)
    model.compile(
        optimizer=keras.optimizers.Adam(),
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[keras.metrics.SparseCategoricalAccuracy()],
    )
    return model


def get_dataset():
    batch_size = 32
    num_val_samples = 10000

    # Return the MNIST dataset in the form of a `tf.data.Dataset`.
    (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

    # Preprocess the data (these are Numpy arrays)
    x_train = x_train.reshape(-1, 784).astype("float32") / 255
    x_test = x_test.reshape(-1, 784).astype("float32") / 255
    y_train = y_train.astype("float32")
    y_test = y_test.astype("float32")

    # Reserve num_val_samples samples for validation
    x_val = x_train[-num_val_samples:]
    y_val = y_train[-num_val_samples:]
    x_train = x_train[:-num_val_samples]
    y_train = y_train[:-num_val_samples]
    return (
        tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(batch_size),
        tf.data.Dataset.from_tensor_slices((x_val, y_val)).batch(batch_size),
        tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(batch_size),
    )


# Create a MirroredStrategy.
strategy = tf.distribute.MirroredStrategy()
print("Number of devices: {}".format(strategy.num_replicas_in_sync))

# Open a strategy scope.
with strategy.scope():
    # Everything that creates variables should be under the strategy scope.
    # In general this is only model construction & `compile()`.
    model = get_compiled_model()

# Train the model on all available devices.
train_dataset, val_dataset, test_dataset = get_dataset()
model.fit(train_dataset, epochs=2, validation_data=val_dataset)

# Test the model on all available devices.
model.evaluate(test_dataset)


INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')
Number of devices: 4
2023-07-19 11:35:32.379801: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:786] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 50000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 784
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
Epoch 1/2
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
1556/1563 [============================>.] - ETA: 0s - loss: 0.2236 - sparse_categorical_accuracy: 0.9328INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
2023-07-19 11:35:46.769935: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:786] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 10000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:2"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 784
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
1563/1563 [==============================] - 16s 7ms/step - loss: 0.2238 - sparse_categorical_accuracy: 0.9328 - val_loss: 0.1347 - val_sparse_categorical_accuracy: 0.9592
Epoch 2/2
1563/1563 [==============================] - 11s 7ms/step - loss: 0.0940 - sparse_categorical_accuracy: 0.9717 - val_loss: 0.0984 - val_sparse_categorical_accuracy: 0.9684
2023-07-19 11:35:59.993148: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:786] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 10000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 784
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
313/313 [==============================] - 2s 4ms/step - loss: 0.1057 - sparse_categorical_accuracy: 0.9676
[0.10571097582578659, 0.9675999879837036]

Использование обратных вызовов для обеспечения устойчивости к неисправности

При использовании распределенного обучения вы всегда должны убедиться, что у вас есть стратегия восстановления после сбоя (устойчивость к ошибкам). Самый простой способ справиться с этим - пройтиModelCheckpointобратный вызовfit(), чтобы сохранить свою модель через регулярные промежутки времени (например, каждые 100 партий или каждую эпоху). Затем вы можете перезапустить обучение из своей сохраненной модели.

Вот простой пример:


import os
from tensorflow import keras

# Prepare a directory to store all the checkpoints.
checkpoint_dir = "./ckpt"
if not os.path.exists(checkpoint_dir):
    os.makedirs(checkpoint_dir)


def make_or_restore_model():
    # Either restore the latest model, or create a fresh one
    # if there is no checkpoint available.
    checkpoints = [checkpoint_dir + "/" + name for name in os.listdir(checkpoint_dir)]
    if checkpoints:
        latest_checkpoint = max(checkpoints, key=os.path.getctime)
        print("Restoring from", latest_checkpoint)
        return keras.models.load_model(latest_checkpoint)
    print("Creating a new model")
    return get_compiled_model()


def run_training(epochs=1):
    # Create a MirroredStrategy.
    strategy = tf.distribute.MirroredStrategy()

    # Open a strategy scope and create/restore the model
    with strategy.scope():
        model = make_or_restore_model()

    callbacks = [
        # This callback saves a SavedModel every epoch
        # We include the current epoch in the folder name.
        keras.callbacks.ModelCheckpoint(
            filepath=checkpoint_dir + "/ckpt-{epoch}", save_freq="epoch"
        )
    ]
    model.fit(
        train_dataset,
        epochs=epochs,
        callbacks=callbacks,
        validation_data=val_dataset,
        verbose=2,
    )


# Running the first time creates the model
run_training(epochs=1)

# Calling the same function again will resume from where we left off
run_training(epochs=1)


INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')
Creating a new model
2023-07-19 11:36:01.811216: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:786] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 50000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 784
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1
2023-07-19 11:36:13.671835: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:786] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 10000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:2"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 784
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
INFO:tensorflow:Assets written to: ./ckpt/ckpt-1/assets
INFO:tensorflow:Assets written to: ./ckpt/ckpt-1/assets
1563/1563 - 14s - loss: 0.2268 - sparse_categorical_accuracy: 0.9322 - val_loss: 0.1148 - val_sparse_categorical_accuracy: 0.9656 - 14s/epoch - 9ms/step
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')
Restoring from ./ckpt/ckpt-1
2023-07-19 11:36:16.521031: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:786] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 50000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 784
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1
2023-07-19 11:36:28.440092: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:786] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 10000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:2"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 784
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
INFO:tensorflow:Assets written to: ./ckpt/ckpt-1/assets
INFO:tensorflow:Assets written to: ./ckpt/ckpt-1/assets
1563/1563 - 13s - loss: 0.0974 - sparse_categorical_accuracy: 0.9703 - val_loss: 0.0960 - val_sparse_categorical_accuracy: 0.9724 - 13s/epoch - 9ms/step

tf.dataСоветы по производительности

При распределенном обучении эффективность, с которой вы загружаете данные, часто может стать критической. Вот несколько советов, чтобы убедиться, что вашtf.dataТрубопроводы работают как можно быстрее.

ПРИМЕЧАНИЕ О БОРЕЗАНИИ

При создании вашего набора данных убедитесь, что он сочетается с глобальным размером партии. Например, если каждый из ваших 8 графических процессоров способен запускать партию из 64 образцов, вы вызовыте использовать глобальный размер партии 512.

Вызовdataset.cache()

Если вы позвоните.cache()В наборе данных его данные будут кэшированы после прохождения первой итерации по данным. Каждая последующая итерация будет использовать кэшированные данные. Кэш может быть в памяти (по умолчанию) или в локальном файле, который вы указываете.

Это может повысить производительность, когда:

  • Ваши данные не ожидаются от итерации на итерацию
  • Вы читаете данные из удаленной распределенной файловой системы
  • Вы читаете данные с локального диска, но ваши данные будут соответствовать памяти, а ваш рабочий процесс значительно связан с IO (например, чтение и декодирование файлов изображений).

Вызовdataset.prefetch(buffer_size)

Вы почти всегда должны звонить.prefetch(buffer_size)После создания набора данных. Это означает, что ваш трубопровод данных будет работать асинхронно из вашей модели, при этом новые образцы предварительно обработаны и хранятся в буфере, в то время как текущие пакетные образцы используются для обучения модели. Следующая партия будет предварительно оформлена в памяти GPU к тому времени, когда текущая партия закончилась.

Несколько работников распределенной синхронной тренировки

Как это работает

В этой настройке у вас есть несколько машин (называетсяработники), каждый с одним или несколькими графическими процессорами на них. Подобно тому, что происходит для тренировок с одним хостом, каждый доступный графический процессор будет запускать одну модельную копию, а значение переменных каждой копии сохраняется синхронизированным после каждой партии.

Важно отметить, что текущая реализация предполагает, что все работники имеют одинаковое количество графических процессоров (гомогенный кластер).

Как его использовать

  1. Установите кластер (мы предоставляем указатели ниже).
  2. Настройка подходящегоTF_CONFIGпеременная среды на каждом работнике. Это говорит работнику, какова его роль и как общаться со своими сверстниками.
  3. На каждом работнике запустите код конструкции и компиляции модели в рамкахMultiWorkerMirroredStrategyОбъект, аналогично мы проделали для обучения на одном хосте.
  4. Запустите код оценки на назначенной машине оценщика.

Настройка кластера

Во -первых, установите кластер (коллектив машин). Каждая машина индивидуально должна быть настроена, чтобы иметь возможность запустить вашу модель (как правило, каждая машина будет запускать одно и то же изображение Docker) и иметь возможность получить доступ к вашему источнику данных (например, GCS).

Управление кластером выходит за рамки этого руководства.Вот документЧтобы помочь вам начать. Вы также можете взглянуть наKubeflowПолем

НастройкаTF_CONFIGпеременная среды

В то время как код, работающий на каждом работнике, почти такой же, как и код, используемый в рабочем процессе с одним хостом (за исключением другогоtf.distributeобъект стратегии), одно существенное различие между рабочим процессом с одним хостом и рабочим процессом с несколькими работниками заключается в том, что вам необходимо установитьTF_CONFIGПеременная среда на каждой машине, работающей в вашем кластере.

АTF_CONFIGПеременная среда - это строка JSON, которая указывает:

  • Конфигурация кластера, в то время как список адресов и портов машин, которые составляют кластер
  • «Задача» работника, которая является той ролью, которую эта конкретная машина должна играть в кластере.

Одним из примеров TF_CONFIG является:


os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

В многоработником синхронном обучении настройки достоверных ролей (типы задач) для машин являются «работник» и «оценщик».

Например, если у вас есть 8 машин с 4 графическими процессорами, у вас может быть 7 работников и один оценщик.

  • Рабочие тренируют модель, каждая из которых обрабатывает подборы глобальной партии.
  • Один из работников (работник 0) будет служить «главным», конкретным видом работника, который отвечает за сохранение журналов и контрольно -пропускных пунктов для последующего повторного использования (как правило, в облачном хранилище).
  • Оценщик запускает непрерывный цикл, который загружает последнюю контрольную точку, сохраненную главным работником, проводит оценку на нем (асинхронно от других работников) и записывает журналы оценки (например, журналы Tensorboard).

Запуск кода на каждом работнике

Вы будете запускать код обучения на каждом работнике (включая начальника) и код оценки на оценщике.

Код обучения в основном такой же, как и то, что вы бы использовали в настройке с одним хостом, за исключением использованияMultiWorkerMirroredStrategyвместоMirroredStrategyПолем

Каждый работник будет запускать один и тот же код (за исключением разницы, объясненной в примечании ниже), включая одни и те же обратные вызовы.

Примечание:Обратные вызовы, которые сохраняют контрольные точки или журналы модели, должны сохранять в другом каталоге для каждого работника. Стандартная практика - все работники должны сэкономить на местном диске (что обычно временно),кроме работника 0, что сохранило бы контрольные точки журнала Tensorboard в облачное место для хранения для последующего доступа и повторного использования.

Оценщик просто использовал быMirroredStrategy(Поскольку он работает на одной машине и не нуждается в связи с другими машинами) и вызовmodel.evaluate()Полем Он будет загружать последнюю контрольную точку, сохранившуюся главным работником в местоположение облачного хранения, и сохранит журналы оценки в то же место, что и главные журналы.

Пример: код, работающий в многоработной настройке

На начальнике (работник 0):


# Set TF_CONFIG
os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})


# Open a strategy scope and create/restore the model.
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
with strategy.scope():
  model = make_or_restore_model()

callbacks = [
    # This callback saves a SavedModel every 100 batches
    keras.callbacks.ModelCheckpoint(filepath='path/to/cloud/location/ckpt',
                                    save_freq=100),
    keras.callbacks.TensorBoard('path/to/cloud/location/tb/')
]
model.fit(train_dataset,
          callbacks=callbacks,
          ...)

На других работниках:


# Set TF_CONFIG
worker_index = 1  # For instance
os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': worker_index}
})


# Open a strategy scope and create/restore the model.
# You can restore from the checkpoint saved by the chief.
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
with strategy.scope():
  model = make_or_restore_model()

callbacks = [
    keras.callbacks.ModelCheckpoint(filepath='local/path/ckpt', save_freq=100),
    keras.callbacks.TensorBoard('local/path/tb/')
]
model.fit(train_dataset,
          callbacks=callbacks,
          ...)

На оценке:


strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
  model = make_or_restore_model()  # Restore from the checkpoint saved by the chief.

results = model.evaluate(val_dataset)
# Then, log the results on a shared location, write TensorBoard logs, etc


Первоначально опубликовано наTensorflowВеб -сайт, эта статья появляется здесь под новым заголовком и имеет лицензию в CC на 4.0. Образцы кода, разделенные по лицензии Apache 2.0


Оригинал
PREVIOUS ARTICLE
NEXT ARTICLE