Reputation: 31
I'm trying to do a demonstration of federated learning with tff. And I've got this far but the error messages I get are just too confusing. The important part is that I want to demostrate that the data is in the remote engine, which is why I use the tf.data.experimental.CsvDataset
and I could not find anything similar in any tutorial. I've managed to do a mini experiment where data was read in the remote site, but I can't get this larger example to work.
Currently it complains about 'p = x * w + b', I believe because x is not a federated_value. But I've tried many many variations and just can't get it to work. The Salary.csv is from a tutorial here https://www.kaggle.com/karthickveerakumar/salary-data-simple-linear-regression?select=Salary_Data.csv
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow_federated as tff
import grpc
ip_address = '127.0.0.1'
port = 8000
channels = [grpc.insecure_channel(f'{ip_address}:{port}') for _ in range(10)]
tff.backends.native.set_remote_execution_context(channels, rpc_mode='STREAMING')
@tf.function()
def load_data():
return tf.data.experimental.CsvDataset('data/Salary.csv', [tf.float64,tf.float64], header=True)
W_TYPE = tff.FederatedType(tf.float64, tff.CLIENTS, all_equal=True)
B_TYPE = tff.FederatedType(tf.float64, tff.CLIENTS, all_equal=True)
@tff.federated_computation(W_TYPE, B_TYPE)
def train(w, b):
data = load_data()
loss = tf.Variable(0.0, dtype=tf.float64)
with tf.GradientTape() as tape:
for x, y in data:
p = x * w + b
loss = loss + tf.square(p - y)
g_w, g_b = tape.gradient(loss, [w, b])
w.assign_sub(0.0001 * g_w)
b.assign_sub(0.0001 * g_b)
return [w, b]
w = tf.Variable(2.0, dtype=tf.float64)
b = tf.Variable(3.0, dtype=tf.float64)
for _ in range(1000):
w, b = train(data, tff.federated_broadcast(w), tff.federated_broadcast(b))
Upvotes: 3
Views: 179
Reputation: 1405
TFF does not support the mixing of federated computation with TensorFlow. The usual paradigm in TFF goes something like:
@tff.tf_computation
decoratortff.federated_computation
, invoke intrinsic and communication operators (like tff.federated_broadcast
above), and tff.federated_map
your TF computations to federated elements.There are many subtleties that have led to the design of a pattern like the above, but the main one is the desire for all logic to be expressible in a serialized representation, so that it can be split up and sent to the various devices in your federated system during true deployment. This is a little hand-wavey; you can think of TFF's decorators as defining a context in which the code you write will be traced, so that it can run in a totally platform-independent way.
Using this pattern, your computation above would look something like:
# These lines set up the TFF execution environment, and importantly are not used
# during the function definitions below--only upon *invocation* of your function
# in the outer Python context will the execution environment be used.
channels = [grpc.insecure_channel(f'{ip_address}:{port}') for _ in range(10)]
tff.backends.native.set_remote_execution_context(channels, rpc_mode='STREAMING')
@tf.function()
def load_data():
# From TFF's perspective, the code written here will be represented as a
# "blob of Tensorflow" that can be shipped out anywhere. We don't actually
# *need* a tf_computation decorator here, since this will be invoked in the
# body of another tf_computation and the logic will get embedded there, but
# we could put one here if we wanted.
return tf.data.experimental.CsvDataset(
'data/Salary.csv', [tf.float64,tf.float64], header=True)
@tff.tf_computation(tf.float64, tf.float64)
@tf.function
def local_train(w, b):
data = load_data()
# We don't need a variable for the loss here, and though TFF will allow you
# to construct a variable to use as a temporary in this fashion, tf.function
# won't. We're pushing the TF team on that one ;).
loss = tf.constant(0.0, dtype=tf.float64)
with tf.GradientTape() as tape:
# We must be inside a tf.function decorator, or the eager Python runtime,
# to iterate over a tf.data.Dataset in this way.
for x, y in data:
p = x * w + b
loss = loss + tf.square(p - y)
g_w, g_b = tape.gradient(loss, [w, b])
w = w- 0.0001 * g_w
b = b- 0.0001 * g_b
return w, b
# Making a symbol to represent these types is always a good idea
W_TYPE = tff.FederatedType(tf.float64, tff.CLIENTS, all_equal=True)
B_TYPE = tff.FederatedType(tf.float64, tff.CLIENTS, all_equal=True)
@tff.federated_computation(W_TYPE, B_TYPE)
def train(w, b):
# Here w and b are elements of federated type, placed at clients.
# We map the training function over these values.
# If they were SERVER-placed instead, we would federated_broadcast them
# out to the clients.
updated_w, updated_b = tff.federated_map(local_train, (w, b))
# TFF's Python runtime represents federated values placed at clients as a list of
# values, with as many elements as there are clients. This number will be inferred
# by the runtime and used to distribute the work. We also technically don't need
# variables here, but I believe they won't cause any problem.
clients_placed_w = [tf.Variable(2.0, dtype=tf.float64)]
clients_placed_b = [tf.Variable(3.0, dtype=tf.float64)]
# We could move the execution environment setup lines all the way down here,
# no problem.
for _ in range(1000):
clients_placed_w, clients_placed_b = train(clients_placed_w, clients_placed_b)
Upvotes: 1