Dmitry Goldenberg
Dmitry Goldenberg

Reputation: 317

Multi-worker training with Keras: how to make it work in an actual cluster, not in a notebook?

I'm trying to run the Keras multi-worker sample with the MultiWorkerMirroredStrategy, following the documentation page here, in an actual cluster of 3 machines.

On each box, Tensorflow 2.4.1 has been installed using the following:

pip3 install --user tensorflow==2.4.1

The TF_CONFIG variable has been set as follows, on each box, and each box bounced afterwards:

>> TF_CONFIG - for xx.x.xxx.xx:
export TF_CONFIG='{"cluster": {"worker": ["xx.x.xxx.xx:2121", "yy.y.yyy.yy:2121", "zz.z.zzz.zz:2121"]}, "task": {"type": "worker", "index": 0}}'

>> TF_CONFIG - for yy.y.yyy.yy:
export TF_CONFIG='{"cluster": {"worker": ["xx.x.xxx.xx:2121", "yy.y.yyy.yy:2121", "zz.z.zzz.zz:2121"]}, "task": {"type": "worker", "index": 1}}'

>> TF_CONFIG - for zz.z.zzz.zz:
export TF_CONFIG='{"cluster": {"worker": ["xx.x.xxx.xx:2121", "yy.y.yyy.yy:2121", "zz.z.zzz.zz:2121"]}, "task": {"type": "worker", "index": 2}}'

The python code is included below. When I launch this as "python tf_multi_worker_mnist.py" on the "chief" node, all I get is the output below. How do I actually get something to run the code, train the model, etc?

Output:

2021-04-02 18:57:14.168712: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
>>
>> Running the prototype...
>> TF_CONFIG: {'cluster': {'worker': ['xx.x.xxx.xx:2121', 'yy.y.yyy.yy:2121', 'zz.z.zzz.zz:2121']}, 'task': {'type': 'worker', 'index': 0}}
>>
2021-04-02 18:57:19.524282: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-04-02 18:57:19.556744: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-04-02 18:57:19.772575: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-04-02 18:57:19.772638: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: ip-10-2-248-96.*******.acme.com
2021-04-02 18:57:19.772660: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: ip-xx-x-xxx-xx.*******.acme.com
2021-04-02 18:57:19.773805: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.80.2
2021-04-02 18:57:19.773865: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.80.2
2021-04-02 18:57:19.773886: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.80.2
2021-04-02 18:57:19.776615: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-04-02 18:57:19.778581: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-04-02 18:57:19.784246: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-04-02 18:57:19.818028: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> xx.x.xxx.xx:2121, 1 -> yy.y.yyy.yy:2121, 2 -> zz.z.zzz.zz:2121}
2021-04-02 18:57:19.818895: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://xx.x.xxx.xx:2121

Code:

import json
import os
import sys
import time

import numpy as np
import tensorflow as tf


os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
if "." not in sys.path:
    sys.path.insert(0, ".")


def mnist_dataset(batch_size):
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    # The `x` arrays are in uint8 and have values in the range [0, 255].
    # You need to convert them to float32 with values in the range [0, 1]
    x_train = x_train / np.float32(255)
    y_train = y_train.astype(np.int64)
    train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
    return train_dataset


def build_and_compile_cnn_model():
    model = tf.keras.Sequential(
        [
            tf.keras.Input(shape=(28, 28)),
            tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
            tf.keras.layers.Conv2D(32, 3, activation="relu"),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(128, activation="relu"),
            tf.keras.layers.Dense(10),
        ]
    )
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
        metrics=["accuracy"],
    )
    return model


def main(args):
    start_time = time.time()

    tf_config = json.loads(os.environ["TF_CONFIG"])
    print(">>")
    print(">> Running the prototype...")
    print(">> TF_CONFIG: {}".format(tf_config))
    print(">>")

    strategy = tf.distribute.MultiWorkerMirroredStrategy()

    # per_worker_batch_size = 64
    # num_workers = len(tf_config["cluster"]["worker"])
    # global_batch_size = per_worker_batch_size * num_workers
    # multi_worker_dataset = mnist_dataset(global_batch_size)

    # turn on sharding
    global_batch_size = 64
    options = tf.data.Options()
    options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
    multi_worker_dataset = mnist_dataset(global_batch_size)
    multi_worker_dataset_with_shrd = multi_worker_dataset.with_options(options)

    with strategy.scope():
        # Model building/compiling need to be within `strategy.scope()`.
        multi_worker_model = build_and_compile_cnn_model()

    multi_worker_model.fit(multi_worker_dataset_with_shrd, epochs=3, steps_per_epoch=70)

    elapsed_time = time.time() - start_time
    str_elapsed_time = time.strftime("%H : %M : %S", time.gmtime(elapsed_time))

    print(">>")
    print(">> Prototype run: finished. Duration: {}.".format(str_elapsed_time))
    print(">>")


if __name__ == "__main__":
    main(sys.argv)

Upvotes: 1

Views: 1107

Answers (1)

Dmitry Goldenberg
Dmitry Goldenberg

Reputation: 317

Whew, just spotted this

You need to launch your program on each worker

in the multi-worker strategy doc.

May be good to have this in the Keras notebook walk-through also.

Also, a notebook is good but if you're starting with actual machines, a cluster specific walkthrough would be nice too. Basically, it'd be close to what I wrote up in the post.

Upvotes: 1

Related Questions