Reputation: 551
I have timestamp dataset which is in format of
And I have written a udf in pyspark to process this dataset and return as Map of key values. But am getting below error message.
Dataset:df_ts_list
+--------------------+
| ts_list|
+--------------------+
|[1477411200, 1477...|
|[1477238400, 1477...|
|[1477022400, 1477...|
|[1477224000, 1477...|
|[1477256400, 1477...|
|[1477346400, 1476...|
|[1476986400, 1477...|
|[1477321200, 1477...|
|[1477306800, 1477...|
|[1477062000, 1477...|
|[1477249200, 1477...|
|[1477040400, 1477...|
|[1477090800, 1477...|
+--------------------+
Pyspark UDF:
>>> def on_time(ts_list):
... import sys
... import os
... sys.path.append('/usr/lib/python2.7/dist-packages')
... os.system("sudo apt-get install python-numpy -y")
... import numpy as np
... import datetime
... import time
... from datetime import timedelta
... ts = np.array(ts_list)
... if ts.size == 0:
... count = 0
... duration = 0
... st = time.mktime(datetime.now())
... ymd = str(datetime.fromtimestamp(st).date())
... else:
... ts.sort()
... one_tag = []
... start = float(ts[0])
... for i in range(len(ts)):
... if i == (len(ts)) - 1:
... end = float(ts[i])
... a_round = [start, end]
... one_tag.append(a_round)
... else:
... diff = (datetime.datetime.fromtimestamp(float(ts[i+1])) - datetime.datetime.fromtimestamp(float(ts[i])))
... if abs(diff.total_seconds()) > 3600:
... end = float(ts[i])
... a_round = [start, end]
... one_tag.append(a_round)
... start = float(ts[i+1])
... one_tag = [u for u in one_tag if u[1] - u[0] > 300]
... count = int(len(one_tag))
... duration = int(np.diff(one_tag).sum())
... ymd = str(datetime.datetime.fromtimestamp(time.time()).date())
... return {'count':count,'duration':duration, 'ymd':ymd}
Pyspark code:
>>> on_time=udf(on_time, MapType(StringType(),StringType()))
>>> df_ts_list.withColumn("one_tag", on_time("ts_list")).select("one_tag").show()
Error:
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/lib/spark/python/pyspark/worker.py", line 172, in main process() File "/usr/lib/spark/python/pyspark/worker.py", line 167, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/lib/spark/python/pyspark/worker.py", line 106, in <lambda> func = lambda _, it: map(mapper, it) File "/usr/lib/spark/python/pyspark/worker.py", line 92, in <lambda> mapper = lambda a: udf(*a) File "/usr/lib/spark/python/pyspark/worker.py", line 70, in <lambda> return lambda *a: f(*a) File "<stdin>", line 27, in on_time File "/usr/lib/spark/python/pyspark/sql/functions.py", line 39, in _ jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col) AttributeError: 'NoneType' object has no attribute '_jvm'
Any help would be appreciated!
Upvotes: 34
Views: 136710
Reputation: 1
In all probability, this error occurs due to absence of spark session creation. So, spark session should be created.
spark = SparkSession.builder
.master('yarn')
.appName('a').getOrCreate()
This should resolve.
Upvotes: 0
Reputation: 81
I found this error in my jupyter notebook. I added the below commands
import findspark
findspark.init()
sc = pyspark.SparkContext(appName="<add-your-name-here>")
and it worked.
its the same problem of spark context not ready or Stopped
Upvotes: 0
Reputation: 37137
Mariusz answer didn't really help me. So if you like me found this because it's the only result on google and you're new to pyspark (and spark in general), here's what worked for me.
In my case I was getting that error because I was trying to execute pyspark code before the pyspark environment had been set up.
Making sure that pyspark was available and set up before doing calls dependent on pyspark.sql.functions
fixed the issue for me.
Upvotes: 73
Reputation: 2698
I faced the same issue, when I had python's round()
function in my code and like @Mariusz said python's round()
function got overridden.
The workaround for this was to use __builtin__.round()
instead of round()
like @Mariusz mentions in the comments in his answer.
Upvotes: 0
Reputation: 3207
This exception also arises when the udf
can not handle None
values.
For example the following code results in the same exception:
get_datetime = udf(lambda ts: to_timestamp(ts), DateType())
df = df.withColumn("datetime", get_datetime("ts"))
However this one does not:
get_datetime = udf(lambda ts: to_timestamp(ts) if ts is not None else None, DateType())
df = df.withColumn("datetime", get_datetime("ts"))
Upvotes: 4
Reputation: 3725
Just to be clear the problem a lot of guys are having is stemming from a single bad programming style. That is from blah import *
When you guys do
from pyspark.sql.functions import *
you overwrite a lot of python builtins functions. I strongly recommending importing functions like
import pyspark.sql.functions as f
# or
import pyspark.sql.functions as pyf
Upvotes: 12
Reputation: 51
Make sure that you are initializing the Spark context. For example:
spark = SparkSession \
.builder \
.appName("myApp") \
.config("...") \
.getOrCreate()
sqlContext = SQLContext(spark)
productData = sqlContext.read.format("com.mongodb.spark.sql").load()
Or as in
spark = SparkSession.builder.appName('company').getOrCreate()
sqlContext = SQLContext(spark)
productData = sqlContext.read.format("csv").option("delimiter", ",") \
.option("quote", "\"").option("escape", "\"") \
.option("header", "true").option("inferSchema", "true") \
.load("/path/thecsv.csv")
Upvotes: 4
Reputation: 13926
The error message says that in 27th line of udf you are calling some pyspark sql functions. It is line with abs()
so I suppose that somewhere above you call from pyspark.sql.functions import *
and it overrides python's abs()
function.
Upvotes: 25