Raj
Raj

Reputation: 417

Convert tf.argmax results to numpy array

I am new to Tensorflow and wrote the following distributed training code. The code works fine.

import multiprocessing
import os
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow_hub as hub
import tensorflow.python.keras.backend as K
#1. Define Workers
def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec, job_name="worker", task_index=i, config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec, job_name="ps", task_index=i, protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, task_id=0, task_type="worker",rpc_layer="grpc")
  return cluster_resolver

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

variable_partitioner = (
    tf.distribute.experimental.partitioners.FixedShardsPartitioner(
        num_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)

word = "Elephant"
sentence = "I am a sentence for which I would like to get its embedding."
paragraph = (
    "Universal Sentence Encoder embeddings also support short paragraphs. "
    "There is no hard limit on how long the paragraph is. Roughly, the longer "
    "the more 'diluted' the embedding will be.")
messages = [word, sentence, paragraph]
#labels=["1","2","3"]
reviews = [[1,0,0],[0,1,0],[0,0,1]]


encoder=hub.load("https://tfhub.dev/google/universal-sentence-encoder/4")

X_train=encoder(messages)

BUFFER_SIZE = len(X_train)
BATCH_SIZE_PER_REPLICA = 2
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
EPOCHS = 4


with strategy.scope():

    model = keras.Sequential()

    model.add(
        keras.layers.Dense(
            units=256,
            input_shape=(X_train.shape[1],),
            activation='relu'
        )
    )
    model.add(
        keras.layers.Dropout(rate=0.5)
    )

    model.add(
        keras.layers.Dense(
            units=128,
            activation='relu'
        )
    )
    model.add(
        keras.layers.Dropout(rate=0.5)
    )

    model.add(keras.layers.Dense(3, activation='softmax'))
    # model.compile(
    #     loss='categorical_crossentropy',
    #     optimizer=keras.optimizers.Adam(0.001),
    #     metrics=['accuracy']
    # )

    # history = model.fit(
    #     np.array(X_train), np.array(reviews),
    #     epochs=10,
    #     batch_size=16,
    #     verbose=1,
    #     shuffle=True
    # )
    optimizer=keras.optimizers.Adam(0.001)
    accuracy = keras.metrics.Accuracy()


def step_fn(x_train_slice):

    x_train, y_train = next(x_train_slice)
    with tf.GradientTape() as tape:
        pred=model(x_train,training=True)
        # tf.print(x_train)
        # tf.print(pred)
        # tf.print(y_train)

        per_example_loss = keras.losses.CategoricalCrossentropy(
            reduction=tf.keras.losses.Reduction.NONE)(y_train, pred)
        loss = tf.nn.compute_average_loss(per_example_loss)
        gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    tf.print("train values are",x_train)
    tf.print(" pred Values are : ", pred)
    tf.print(" ArgMAx Values are ",tf.math.argmax(pred,axis=0)) #problem
    tf.print(" actual_pred Values are : ", actual_pred)
    tf.print(" Labels  are : ", y_train)
    tf.print(" Labels Max Values are : ", tf.argmax(y_train))
    accuracy.update_state(y_train, actual_pred)
    tf.print("Accuracy is : ",accuracy.result())
    return loss

@tf.function
def distributed_train_step(x_train_slice):
    losses = strategy.run(step_fn,args=(x_train_slice,))
    return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)


@tf.function
def per_worker_dataset_fn():
    train_dataset = tf.data.Dataset.from_tensor_slices((X_train, reviews)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE)
    # test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
    train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
    # test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)
    return train_dist_dataset


coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
num_epoches = 5
steps_per_epoch = 1
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(distributed_train_step, args=(per_worker_iterator,))
    # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f.",(i,accuracy.result().numpy()))

The problem is, in the step_fn once I get the prediction values I would like to get the corresponding labels, for this I have used this line of code tf.print(" ArgMAx Values are ",tf.math.argmax(pred,axis=0)) #problem

The argmax gives the array of indices for max probabilities. I would like to extract this as numpy array and index it to reviews array (One-Hot encoded values) to get the confusion matrix.

But I'm not able to convert tf.math.argmax(pred,axis=0) tensor to numpy array. I tried many approaches like eval(K.get_session()) and so on but nothing worked. Any help is appreciated.

Thanks much

Upvotes: 0

Views: 905

Answers (1)

Mark H
Mark H

Reputation: 4451

OK, I found two solutions here.

Here's the way you probably should do it:

Add some more Keras metrics after accuracy that you can use in computing the confusion matrix:

accuracy = keras.metrics.Accuracy()
tp = keras.metrics.TruePositives()
tn = keras.metrics.TrueNegatives()
fp = keras.metrics.FalsePositives()
fn = keras.metrics.FalseNegatives()

Now update those as well in step_fn:

accuracy.update_state(y_train, actual_pred)
argmax_pred = tf.one_hot(tf.math.argmax(pred,axis=1),depth=pred.shape[1])
tp.update_state(y_train, argmax_pred)
tn.update_state(y_train, argmax_pred)
fp.update_state(y_train, argmax_pred)
fn.update_state(y_train, argmax_pred)

Now you can access the result back where you were accessing the accuracy results:

coordinator.join()
print ("Finished epoch %d, accuracy is %f.",(i,accuracy.result().numpy()))
print ("TP=%f  TN=%f  FP=%f  FN=%f" % (tp.result().numpy(),tn.result().numpy(),fp.result().numpy(),fn.result().numpy()))

That should do the trick for you.


Here's another way to do it:

The strategy is just keep returning your argmax values until they're back in your main loop where they'll appear as RemoteValue objects and then fetch() their values.

For example, in step_fn, send back your argmax values to the calling function:

return (loss, tf.math.argmax(pred,axis=0))

Then, in distributed_train_step, adjust for the tuple being returned, and keep returning the argmax to the next step, maybe like so:

def distributed_train_step(x_train_slice):
    (losses,argmaxes) = strategy.run(step_fn,args=(x_train_slice,))
    strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)
    return argmaxes

Notice there that I moved your strategy.reduce from the return line to its own line. You weren't using the returned value anyway, because you had no lval for the coordinator.schedule line, but now you can add one to grab those returned argmaxes:

argmaxes = coordinator.schedule(distributed_train_step, args=(per_worker_iterator,))
print ("Back at home, argmaxes=",argmaxes.fetch())

Make sure you use the fetch() command, because argmaxes will be different than a Tensor once it makes it back like this. The RemoteValue class is documented here: https://www.tensorflow.org/api_docs/python/tf/distribute/experimental/coordinator/RemoteValue

You'd need to expand this solution by returning any other values you were going to use for calculating TP/FP/TN/FN on your own.

Upvotes: 1

Related Questions