Revan
Revan

Reputation: 551

PySpark error: AttributeError: 'NoneType' object has no attribute '_jvm'

I have timestamp dataset which is in format of

And I have written a udf in pyspark to process this dataset and return as Map of key values. But am getting below error message.

Dataset:df_ts_list

+--------------------+
|             ts_list|
+--------------------+
|[1477411200, 1477...|
|[1477238400, 1477...|
|[1477022400, 1477...|
|[1477224000, 1477...|
|[1477256400, 1477...|
|[1477346400, 1476...|
|[1476986400, 1477...|
|[1477321200, 1477...|
|[1477306800, 1477...|
|[1477062000, 1477...|
|[1477249200, 1477...|
|[1477040400, 1477...|
|[1477090800, 1477...|
+--------------------+

Pyspark UDF:

>>> def on_time(ts_list):
...     import sys
...     import os
...     sys.path.append('/usr/lib/python2.7/dist-packages')
...     os.system("sudo apt-get install python-numpy -y")
...     import numpy as np
...     import datetime
...     import time
...     from datetime import timedelta
...     ts = np.array(ts_list)
...     if ts.size == 0:
...             count = 0
...             duration = 0
...             st = time.mktime(datetime.now())
...             ymd = str(datetime.fromtimestamp(st).date())
...     else:
...             ts.sort()
...             one_tag = []
...             start = float(ts[0])
...             for i in range(len(ts)):
...                     if i == (len(ts)) - 1:
...                             end = float(ts[i])
...                             a_round = [start, end]
...                             one_tag.append(a_round)
...                     else:
...                             diff = (datetime.datetime.fromtimestamp(float(ts[i+1])) - datetime.datetime.fromtimestamp(float(ts[i])))
...                             if abs(diff.total_seconds()) > 3600:
...                                     end = float(ts[i])
...                                     a_round = [start, end]
...                                     one_tag.append(a_round)
...                                     start = float(ts[i+1])
...             one_tag = [u for u in one_tag if u[1] - u[0] > 300]
...             count = int(len(one_tag))
...             duration = int(np.diff(one_tag).sum())
...             ymd = str(datetime.datetime.fromtimestamp(time.time()).date())
...     return {'count':count,'duration':duration, 'ymd':ymd}

Pyspark code:

>>> on_time=udf(on_time, MapType(StringType(),StringType()))
>>> df_ts_list.withColumn("one_tag", on_time("ts_list")).select("one_tag").show()

Error:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/worker.py", line 172, in main
    process()
  File "/usr/lib/spark/python/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/pyspark/worker.py", line 106, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "/usr/lib/spark/python/pyspark/worker.py", line 92, in <lambda>
    mapper = lambda a: udf(*a)
  File "/usr/lib/spark/python/pyspark/worker.py", line 70, in <lambda>
    return lambda *a: f(*a)
  File "<stdin>", line 27, in on_time
  File "/usr/lib/spark/python/pyspark/sql/functions.py", line 39, in _
    jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col)
AttributeError: 'NoneType' object has no attribute '_jvm'

Any help would be appreciated!

Upvotes: 34

Views: 136710

Answers (8)

kpandey
kpandey

Reputation: 1

In all probability, this error occurs due to absence of spark session creation. So, spark session should be created.

spark = SparkSession.builder
       .master('yarn')
       .appName('a').getOrCreate()

This should resolve.

Upvotes: 0

Mayuresh Dhawan
Mayuresh Dhawan

Reputation: 81

I found this error in my jupyter notebook. I added the below commands

import findspark
findspark.init()
sc = pyspark.SparkContext(appName="<add-your-name-here>")

and it worked.

its the same problem of spark context not ready or Stopped

Upvotes: 0

fmsf
fmsf

Reputation: 37137

Mariusz answer didn't really help me. So if you like me found this because it's the only result on google and you're new to pyspark (and spark in general), here's what worked for me.

In my case I was getting that error because I was trying to execute pyspark code before the pyspark environment had been set up.

Making sure that pyspark was available and set up before doing calls dependent on pyspark.sql.functions fixed the issue for me.

Upvotes: 73

The Singularity
The Singularity

Reputation: 2698

I faced the same issue, when I had python's round() function in my code and like @Mariusz said python's round() function got overridden.

The workaround for this was to use __builtin__.round() instead of round() like @Mariusz mentions in the comments in his answer.

Upvotes: 0

dsalaj
dsalaj

Reputation: 3207

This exception also arises when the udf can not handle None values. For example the following code results in the same exception:

get_datetime = udf(lambda ts: to_timestamp(ts), DateType())
df = df.withColumn("datetime", get_datetime("ts"))

However this one does not:

get_datetime = udf(lambda ts: to_timestamp(ts) if ts is not None else None, DateType())
df = df.withColumn("datetime", get_datetime("ts"))

Upvotes: 4

SARose
SARose

Reputation: 3725

Just to be clear the problem a lot of guys are having is stemming from a single bad programming style. That is from blah import *

When you guys do

from pyspark.sql.functions import *

you overwrite a lot of python builtins functions. I strongly recommending importing functions like

import pyspark.sql.functions as f
# or 
import pyspark.sql.functions as pyf

Upvotes: 12

Gustavo Frederico
Gustavo Frederico

Reputation: 51

Make sure that you are initializing the Spark context. For example:

spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("...") \
    .getOrCreate()
sqlContext = SQLContext(spark)
productData = sqlContext.read.format("com.mongodb.spark.sql").load()

Or as in

spark = SparkSession.builder.appName('company').getOrCreate()
sqlContext = SQLContext(spark)
productData = sqlContext.read.format("csv").option("delimiter", ",") \
    .option("quote", "\"").option("escape", "\"") \
    .option("header", "true").option("inferSchema", "true") \
    .load("/path/thecsv.csv")

Upvotes: 4

Mariusz
Mariusz

Reputation: 13926

The error message says that in 27th line of udf you are calling some pyspark sql functions. It is line with abs() so I suppose that somewhere above you call from pyspark.sql.functions import * and it overrides python's abs() function.

Upvotes: 25

Related Questions