Reputation: 1
I am trying to apply udf functions, to a dataframe column that consists of strings. Function uses Tensorflow GUSE and converts string to an array of floats.
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
import tf_sentencepiece
# Graph set up.
g = tf.Graph()
with g.as_default():
text_input = tf.placeholder(dtype=tf.string, shape=[None])
embed = hub.Module("https://tfhub.dev/google/universal-sentence-encoder-multilingual-large/1")
embedded_text = embed(text_input)
init_op = tf.group([tf.global_variables_initializer(), tf.tables_initializer()])
g.finalize()
# Initialize session.
session = tf.Session(graph=g)
session.run(init_op)
def embed_mail(x):
embedding = session.run(embedded_text, feed_dict={text_input:[x]})
embedding = flatten(embedding)
result = [np.float32(i).item() for i in embedding]
return result
But whenever I try to run this function with:
embed_mail_udf = udf(embed_mail, ArrayType(FloatType()))
df = df.withColumn('embedding',embed_mail_udf(df.text))
I keep getting an error: Could not serialize object: TypeError: can't pickle SwigPyObject objects. What am I doing wrong?
Upvotes: 0
Views: 1886
Reputation: 1570
To run your UDF's code on cluster Spark needs to be able to serialize all the data "attached" to that function. Your UDF embed_mail
contains a reference to TF Session, so the function is a closure
and Spark first needs to serialize the content of tf.Session. I bet that it is a cause of the issue. Unfortunately, I have no experience with TF, but it seems like you can get your mail data from TF before running Spark, broadcast it and then use in your udf?
Upvotes: 2