FATMA BERRAK GUNGOR
FATMA BERRAK GUNGOR

Reputation: 71

NLTK Lookup Error for punkt downloading on pyspark dataframe in Databricks

I am trying to find the similarities of text columns('title', 'headline') by applying cosine similarity to the pyspark dataframe in Databricks. My function is called 'cosine_sim_udf' and to able to use it, I have to do the first udf conversion.

I get lookup error after applying the function to df. Does anyone know a reason or have a solution suggestion?

My function to find cosine similarities;

nltk.download('punkt')

stemmer = nltk.stem.porter.PorterStemmer()
remove_punctuation_map = dict((ord(char), None) for char in string.punctuation)

def stem_tokens(tokens):
    return [stemmer.stem(item) for item in tokens]


'''remove punctuation, lowercase, stem'''
def normalize(text):
    return stem_tokens(nltk.word_tokenize(text.lower().translate(remove_punctuation_map)))

vectorizer = TfidfVectorizer(tokenizer=normalize, stop_words='english')

def cosine_sim(text1, text2):
    tfidf = vectorizer.fit_transform([text1, text2])
    return float(((tfidf * tfidf.T).A)[0,1])
cosine_sim_udf = udf(cosine_sim, FloatType())

df2 =  df.withColumn('cosine_distance', cosine_sim_udf('title', 'headline')) # title and headline are text to find similarities

then I get this error;

PythonException: 'LookupError: 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 426.0 failed 4 times, most recent failure: Lost task 0.3 in stage 426.0 (TID 2135) (10.109.245.129 executor 1): org.apache.spark.api.python.PythonException: 'LookupError: 
**********************************************************************
  Resource [93mpunkt[0m not found.
  Please use the NLTK Downloader to obtain the resource:

  [31m>>> import nltk
  >>> nltk.download('punkt')
  [0m
  For more information see: https://www.nltk.org/data.html

  Attempted to load [93mtokenizers/punkt/PY3/english.pickle[0m

  Searched in:
    - '/root/nltk_data'
    - '/databricks/python/nltk_data'
    - '/databricks/python/share/nltk_data'
    - '/databricks/python/lib/nltk_data'
    - '/usr/share/nltk_data'
    - '/usr/local/share/nltk_data'
    - '/usr/lib/nltk_data'

Upvotes: 3

Views: 2060

Answers (2)

noam cohen
noam cohen

Reputation: 112

Note: This answer is tested on DataBricks version running on 2023-08. User interfaces tend to change over time, so always check the docs:

https://docs.databricks.com/en/libraries/workspace-libraries.html

quoting: "Workspace libraries serve as a local repository from which you create cluster-installed libraries. ... You must install a workspace library on a cluster before it can be used in a notebook or job"

In Databricks, you install a library to a cluster (for your own use), or install in the workspace to make it available to all clusters in the workspace.

The library can be for example a python lib (nltk) from pypi, a JAR file you built etc.

To recap what the docs say: To install in one cluster:

  • In your workspace, choose "Compute" in the left window.

  • choose "Libraries" tab.

  • click the "Install now" button.

  • For your case, choose "library source" Pypi, "package" nltk [ I don't know why, but when I specified 'nltk==3.8.1' the installation seemed to fail]. They DO recommend to use a specific version.

  • restart the cluster

  • try it:

    import nltk
    nltk.download('punkt')
    
    

To install in the Workspace:

  • click "workspace"
  • choose the workspace you want to install. e.g. "shared"
  • right click , Create, Library

Now the library is available to clusters the next time they restart.

Upvotes: 0

Alex Ott
Alex Ott

Reputation: 87224

The problem is that in your case the nltk.download('punkt') was executed only on the driver node, while your UDF function is executed on the worker nodes, where it's not installed.

You have following possibilities:

  • install required resource using cluster init script with something like this (it will install this file on all nodes):
#!/bin/bash

pip install nltk
python -m nltk.downloader punkt
  • Something like this (haven't tested, but may work. Also may not work on the autoscaling cluster):
import nltk
num_executors = sc._jsc.sc().getExecutorMemoryStatus().size()-1
sc.parallelize((("")*num_executors), num_executors) \
  .mapPartitions(lambda p: [nltk.download('punkt')]).collect()

Upvotes: 1

Related Questions