Aparajith Chandran
Aparajith Chandran

Reputation: 140

Initializing gensim objects on all spark worker nodes

I have a function and a UDF which I have created,

    def test(string):
        path_index=SparkFiles.get("corpus_final_production.index")
        path_dictionary=SparkFiles.get('dictionary_production.gensim')
        path_corpus=SparkFiles.get("corpus_final_production")
        dictionary = corpora.Dictionary.load(path_dictionary)
        corpus = corpora.MmCorpus(path_corpus)
        tf_idf = gensim.models.TfidfModel(corpus)
        index_tmpfile = get_tmpfile(path_index)
        sims = gensim.similarities.Similarity(index_tmpfile,tf_idf[corpus],num_features=len(dictionary))
        query_doc=word_tokenize(string.lower())
        query_doc_bow=dictionary.doc2bow(query_doc)
        query_doc_tf_idf=tf_idf[query_doc_bow]
        sum_of_sims=np.sum(sims[query_doc_tf_idf], dtype=np.float32)
        max_sims=np.amax(sims[query_doc_tf_idf])
        max_count=np.count_nonzero(sims[query_doc_tf_idf] >= max_sims-0.05)
        max_sims_origin=file_docs[np.argmax(sims[query_doc_tf_idf])]
        return max_sims_origin

   test_udf = udf(lambda x: test(x),StringType())
   df_new = garuda.withColumn('max_sim_origin', test_udf(garuda.text))

It is working fine but as you see I am applying a rowwise action to the pyspark dataframe. For every row, the dictionary corpus and sims get generated with the indexes which takes close to 6 minutes for every row.

Is there a way for me to initialize the dictionary, corpus and index files before onto every worker node instead of calling it in the UDF.

I am new to spark so every little help will be appreciated

I have added all the dictionary and corpus files, as it is pregenerated using sc.addFile()

Upvotes: 0

Views: 223

Answers (1)

Aparajith Chandran
Aparajith Chandran

Reputation: 140

So I figured it out. The way gensim works is it creates shards, now if I pass the gensim sims object to the function, the object only has the path of the local machine (master note address), instead I initialize it once in each worker like this

def test(string):
    import os.path
    path_index=SparkFiles.get("corpus_final_production.index")
    file_exists = os.path.exists(path_index)    
    if file_exists:
        path_index=SparkFiles.get("corpus_final_production.index")
        path_dictionary=SparkFiles.get('dictionary_production.gensim')
        path_corpus=SparkFiles.get("corpus_final_production")
        dictionary = corpora.Dictionary.load(path_dictionary)
        corpus = corpora.MmCorpus(path_corpus)
        sims =  gensim.similarities.Similarity.load(path_index)
        sims.output_prefix = path_index
        sims.check_moved()
        print("Sims: "+str(len(sims)))
        tf_idf = gensim.models.TfidfModel(corpus)
        query_doc=word_tokenize(string.lower())
        query_doc_bow=dictionary.doc2bow(query_doc)
        query_doc_tf_idf=tf_idf[query_doc_bow]
        max_sims=(np.amax(sims[query_doc_tf_idf]))
        max_sims_origin=file_docs[np.argmax(sims[query_doc_tf_idf])]
    else:
        path_index=SparkFiles.get("corpus_final_production.index")
        path_dictionary=SparkFiles.get('dictionary_production.gensim')
        path_corpus=SparkFiles.get("corpus_final_production")
        dictionary = corpora.Dictionary.load(path_dictionary)
        corpus = corpora.MmCorpus(path_corpus)
        print("Dictionary: "+str(len(dictionary)))
        tf_idf = gensim.models.TfidfModel(corpus)
        print("Corpus: "+str(len(corpus)))
        index_tmpfile = get_tmpfile(path_index)
        sims = gensim.similarities.Similarity(index_tmpfile,tf_idf[corpus],num_features=len(dictionary))
        sims.save(path_index)
        print(len(sims))
        query_doc=word_tokenize(string.lower())
        query_doc_bow=dictionary.doc2bow(query_doc)
        query_doc_tf_idf=tf_idf[query_doc_bow]
        max_sims=(np.amax(sims[query_doc_tf_idf]))
        max_sims_origin=file_docs[np.argmax(sims[query_doc_tf_idf])]
    return t.Row('max_sims_origin', 'max_sims')(max_sims_origin,float(max_sims))
    

If the path exists it searches for it but the first time it will always create a local copy on the worker nodes

Upvotes: 0

Related Questions