Miguel Vazquez
Miguel Vazquez

Reputation: 31

Linear regression using tf.data with federated core API and data on remote execution client

I'm trying to do a demonstration of federated learning with tff. And I've got this far but the error messages I get are just too confusing. The important part is that I want to demostrate that the data is in the remote engine, which is why I use the tf.data.experimental.CsvDataset and I could not find anything similar in any tutorial. I've managed to do a mini experiment where data was read in the remote site, but I can't get this larger example to work.

Currently it complains about 'p = x * w + b', I believe because x is not a federated_value. But I've tried many many variations and just can't get it to work. The Salary.csv is from a tutorial here https://www.kaggle.com/karthickveerakumar/salary-data-simple-linear-regression?select=Salary_Data.csv

import pandas as pd
import matplotlib.pyplot as plt

import tensorflow as tf
import tensorflow_federated as tff

import grpc

ip_address = '127.0.0.1'
port = 8000

channels = [grpc.insecure_channel(f'{ip_address}:{port}') for _ in range(10)]

tff.backends.native.set_remote_execution_context(channels, rpc_mode='STREAMING')

@tf.function()
def load_data():
    return tf.data.experimental.CsvDataset('data/Salary.csv', [tf.float64,tf.float64], header=True)


W_TYPE = tff.FederatedType(tf.float64, tff.CLIENTS, all_equal=True)
B_TYPE = tff.FederatedType(tf.float64, tff.CLIENTS, all_equal=True)
@tff.federated_computation(W_TYPE, B_TYPE)
def train(w, b):
    data = load_data()
    loss = tf.Variable(0.0, dtype=tf.float64)
    with tf.GradientTape() as tape:
        for x, y in data:
            p = x * w + b
            loss = loss + tf.square(p - y)

    g_w, g_b = tape.gradient(loss, [w, b])
    w.assign_sub(0.0001 * g_w)
    b.assign_sub(0.0001 * g_b)
    return [w, b]

w = tf.Variable(2.0, dtype=tf.float64)
b = tf.Variable(3.0, dtype=tf.float64)
for _ in range(1000):
    w, b = train(data, tff.federated_broadcast(w), tff.federated_broadcast(b))

Upvotes: 3

Views: 179

Answers (1)

Keith Rush
Keith Rush

Reputation: 1405

TFF does not support the mixing of federated computation with TensorFlow. The usual paradigm in TFF goes something like:

  1. Write your local functions in TensorFlow, using TFF's @tff.tf_computation decorator
  2. Inside the body of a tff.federated_computation, invoke intrinsic and communication operators (like tff.federated_broadcast above), and tff.federated_map your TF computations to federated elements.

There are many subtleties that have led to the design of a pattern like the above, but the main one is the desire for all logic to be expressible in a serialized representation, so that it can be split up and sent to the various devices in your federated system during true deployment. This is a little hand-wavey; you can think of TFF's decorators as defining a context in which the code you write will be traced, so that it can run in a totally platform-independent way.

Using this pattern, your computation above would look something like:


# These lines set up the TFF execution environment, and importantly are not used 
# during the function definitions below--only upon *invocation* of your function
# in the outer Python context will the execution environment be used.
channels = [grpc.insecure_channel(f'{ip_address}:{port}') for _ in range(10)]
tff.backends.native.set_remote_execution_context(channels, rpc_mode='STREAMING')


@tf.function()
def load_data():
    # From TFF's perspective, the code written here will be represented as a
    # "blob of Tensorflow" that can be shipped out anywhere. We don't actually
    # *need* a tf_computation decorator here, since this will be invoked in the
    # body of another tf_computation and the logic will get embedded there, but 
    # we could put one here if we wanted.
    return tf.data.experimental.CsvDataset(
        'data/Salary.csv', [tf.float64,tf.float64], header=True)

@tff.tf_computation(tf.float64, tf.float64)
@tf.function
def local_train(w, b):
    data = load_data()
    # We don't need a variable for the loss here, and though TFF will allow you
    # to construct a variable to use as a temporary in this fashion, tf.function
    # won't. We're pushing the TF team on that one ;).
    loss = tf.constant(0.0, dtype=tf.float64)
    with tf.GradientTape() as tape:
        # We must be inside a tf.function decorator, or the eager Python runtime,
        # to iterate over a tf.data.Dataset in this way.
        for x, y in data:
            p = x * w + b
            loss = loss + tf.square(p - y)

    g_w, g_b = tape.gradient(loss, [w, b])
    w = w- 0.0001 * g_w
    b = b- 0.0001 * g_b
    return w, b

# Making a symbol to represent these types is always a good idea
W_TYPE = tff.FederatedType(tf.float64, tff.CLIENTS, all_equal=True)
B_TYPE = tff.FederatedType(tf.float64, tff.CLIENTS, all_equal=True)

@tff.federated_computation(W_TYPE, B_TYPE)
def train(w, b):
    # Here w and b are elements of federated type, placed at clients.
    # We map the training function over these values.
    # If they were SERVER-placed instead, we would federated_broadcast them
    # out to the clients.
    updated_w, updated_b = tff.federated_map(local_train, (w, b))

# TFF's Python runtime represents federated values placed at clients as a list of
# values, with as many elements as there are clients. This number will be inferred 
# by the runtime and used to distribute the work. We also technically don't need
# variables here, but I believe they won't cause any problem.
clients_placed_w = [tf.Variable(2.0, dtype=tf.float64)]
clients_placed_b = [tf.Variable(3.0, dtype=tf.float64)]

# We could move the execution environment setup lines all the way down here,
# no problem.
for _ in range(1000):
    clients_placed_w, clients_placed_b = train(clients_placed_w, clients_placed_b)

Upvotes: 1

Related Questions