user3235835
user3235835

Reputation: 1

Pyspark UDF Pickling error, can't pickle SwigPyObject objects

I am trying to apply udf functions, to a dataframe column that consists of strings. Function uses Tensorflow GUSE and converts string to an array of floats.

import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
import tf_sentencepiece
# Graph set up.
g = tf.Graph()
with g.as_default():
  text_input = tf.placeholder(dtype=tf.string, shape=[None])
  embed = hub.Module("https://tfhub.dev/google/universal-sentence-encoder-multilingual-large/1")
  embedded_text = embed(text_input)
  init_op = tf.group([tf.global_variables_initializer(), tf.tables_initializer()])
g.finalize()

# Initialize session.
session = tf.Session(graph=g)
session.run(init_op)

def embed_mail(x): 
    embedding = session.run(embedded_text, feed_dict={text_input:[x]})
    embedding = flatten(embedding)
    result = [np.float32(i).item() for i in embedding]
    return result

But whenever I try to run this function with:

embed_mail_udf = udf(embed_mail, ArrayType(FloatType()))
df = df.withColumn('embedding',embed_mail_udf(df.text))

I keep getting an error: Could not serialize object: TypeError: can't pickle SwigPyObject objects. What am I doing wrong?

Upvotes: 0

Views: 1886

Answers (1)

Artem Vovsia
Artem Vovsia

Reputation: 1570

To run your UDF's code on cluster Spark needs to be able to serialize all the data "attached" to that function. Your UDF embed_mail contains a reference to TF Session, so the function is a closure and Spark first needs to serialize the content of tf.Session. I bet that it is a cause of the issue. Unfortunately, I have no experience with TF, but it seems like you can get your mail data from TF before running Spark, broadcast it and then use in your udf?

Upvotes: 2

Related Questions