Reputation: 140
I have a function and a UDF which I have created,
def test(string):
path_index=SparkFiles.get("corpus_final_production.index")
path_dictionary=SparkFiles.get('dictionary_production.gensim')
path_corpus=SparkFiles.get("corpus_final_production")
dictionary = corpora.Dictionary.load(path_dictionary)
corpus = corpora.MmCorpus(path_corpus)
tf_idf = gensim.models.TfidfModel(corpus)
index_tmpfile = get_tmpfile(path_index)
sims = gensim.similarities.Similarity(index_tmpfile,tf_idf[corpus],num_features=len(dictionary))
query_doc=word_tokenize(string.lower())
query_doc_bow=dictionary.doc2bow(query_doc)
query_doc_tf_idf=tf_idf[query_doc_bow]
sum_of_sims=np.sum(sims[query_doc_tf_idf], dtype=np.float32)
max_sims=np.amax(sims[query_doc_tf_idf])
max_count=np.count_nonzero(sims[query_doc_tf_idf] >= max_sims-0.05)
max_sims_origin=file_docs[np.argmax(sims[query_doc_tf_idf])]
return max_sims_origin
test_udf = udf(lambda x: test(x),StringType())
df_new = garuda.withColumn('max_sim_origin', test_udf(garuda.text))
It is working fine but as you see I am applying a rowwise action to the pyspark dataframe. For every row, the dictionary corpus and sims get generated with the indexes which takes close to 6 minutes for every row.
Is there a way for me to initialize the dictionary, corpus and index files before onto every worker node instead of calling it in the UDF.
I am new to spark so every little help will be appreciated
I have added all the dictionary and corpus files, as it is pregenerated using sc.addFile()
Upvotes: 0
Views: 223
Reputation: 140
So I figured it out. The way gensim works is it creates shards, now if I pass the gensim sims object to the function, the object only has the path of the local machine (master note address), instead I initialize it once in each worker like this
def test(string):
import os.path
path_index=SparkFiles.get("corpus_final_production.index")
file_exists = os.path.exists(path_index)
if file_exists:
path_index=SparkFiles.get("corpus_final_production.index")
path_dictionary=SparkFiles.get('dictionary_production.gensim')
path_corpus=SparkFiles.get("corpus_final_production")
dictionary = corpora.Dictionary.load(path_dictionary)
corpus = corpora.MmCorpus(path_corpus)
sims = gensim.similarities.Similarity.load(path_index)
sims.output_prefix = path_index
sims.check_moved()
print("Sims: "+str(len(sims)))
tf_idf = gensim.models.TfidfModel(corpus)
query_doc=word_tokenize(string.lower())
query_doc_bow=dictionary.doc2bow(query_doc)
query_doc_tf_idf=tf_idf[query_doc_bow]
max_sims=(np.amax(sims[query_doc_tf_idf]))
max_sims_origin=file_docs[np.argmax(sims[query_doc_tf_idf])]
else:
path_index=SparkFiles.get("corpus_final_production.index")
path_dictionary=SparkFiles.get('dictionary_production.gensim')
path_corpus=SparkFiles.get("corpus_final_production")
dictionary = corpora.Dictionary.load(path_dictionary)
corpus = corpora.MmCorpus(path_corpus)
print("Dictionary: "+str(len(dictionary)))
tf_idf = gensim.models.TfidfModel(corpus)
print("Corpus: "+str(len(corpus)))
index_tmpfile = get_tmpfile(path_index)
sims = gensim.similarities.Similarity(index_tmpfile,tf_idf[corpus],num_features=len(dictionary))
sims.save(path_index)
print(len(sims))
query_doc=word_tokenize(string.lower())
query_doc_bow=dictionary.doc2bow(query_doc)
query_doc_tf_idf=tf_idf[query_doc_bow]
max_sims=(np.amax(sims[query_doc_tf_idf]))
max_sims_origin=file_docs[np.argmax(sims[query_doc_tf_idf])]
return t.Row('max_sims_origin', 'max_sims')(max_sims_origin,float(max_sims))
If the path exists it searches for it but the first time it will always create a local copy on the worker nodes
Upvotes: 0