Reputation: 3267
I am trying to submit a pyspark job locally.
When I use a simpler code without a class it runs perfectly fine. But when I try adding the code inside a class it doesn't
This version of the same code works.
https://pastebin.com/raw/uRydZN3J
I am trying to submit a pyspark job locally .
I've tried to run the job like so ./bin/spark-submit --master local[4] /Users/Host/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark_param.py 10
It gives an error at line where it tries to access the DataFrame
email_fact = email_fact.withColumn('SID',self.SID_udf(email_fact['email_omni_code'])).withColumn('TPID',
It shows the error as pickle.PicklingError: Could not serialize object: TypeError: 'JavaPackage' object is not callable
I might be doing some silly mistake, since I have just tried working on this. I've been through several other similar posts with the same kind of error but wasn't able to find one which was working for me.
Any help is appreciated.
import re,time
from pyspark.sql.types import *
from pyspark import SparkContext
import datetime
from pyspark.sql import SQLContext
from pyspark.sql.window import Window
from pyspark.sql import Row, functions as F
from pyspark.sql.functions import udf
class EmailFact(object):
def __init__(self,input_path,output_path,read_format,write_format,compare_date,parse_input=True):
self.sc = SparkContext.getOrCreate()
self.sqlContext = SQLContext(self.sc)
self.cmp_date = compare_date
self.create_udfs()
self.create_schema()
self.input_path,self.output_path = input_path,output_path
self.read_format,self.write_format = read_format,write_format
self.parse_flag = parse_input
self.input()
if parse_input:
self.event_type,self.mg_load_date = self.parse_input()
self.event_type_udf = udf(lambda : self.event_type,StringType())
self.mg_load_date_udf = udf(lambda : self.mg_load_date,StringType())
self.execute()
self.output()
def create_schema(self):
self.schema = StructType([StructField("email_addr",StringType(),True),StructField("tpid",StringType(),True),StructField("eapid",
StringType(),True),StructField("send_date",StringType(),True),StructField("event_date",StringType(),True),\
StructField("event_time",StringType(),True),StructField("subchannel_name",StringType(),True),\
StructField("program_code",StringType(),True),StructField("campaign_code",StringType(),True),\
StructField("sid",StringType(),True),StructField("key_id",StringType(),True),\
StructField("paid_id",StringType(),True),StructField("email_domain",StringType(),True),\
StructField("request_id",StringType(),True),StructField("email_omni_code",StringType(),True),\
StructField("email_omni_code_mini",StringType(),True),StructField("activity_id",StringType(),True),\
StructField("device",StringType(),True),StructField("browser",StringType(),True),\
StructField("operating_system",StringType(),True),StructField("email_client",StringType(),True),\
StructField("details",StringType(),True),StructField("correlationid",StringType(),True),\
StructField("category",StringType(),True),StructField("categorycode",StringType(),True),\
StructField("ipaddress",StringType(),True),StructField("messagesize",LongType(),True),\
StructField("url",StringType(),True),StructField("cw_year",StringType(),True),\
StructField("cw_month",StringType(),True),StructField("cw_day",StringType(),True),\
StructField("cw_hour",StringType(),True),StructField("event_type",StringType(),True),
StructField("mg_load_date", StringType(), True)])
def create_udfs(self):
self.parse_omni_code_udf = udf(self.parse_omni_code, StringType())
self.parse_site_name_udf = udf(self.parse_site_name, StringType())
self.send_date_conv_udf = udf(self.send_date_conv, StringType())
self.parse_null_udf = udf(self.parse_null, StringType())
self.pivot_udf = udf(self.pivot, IntegerType())
self.send_pivot_udf = udf(self.send_pivot, IntegerType())
self.bounce_pivot_udf = udf(self.bounce_pivot, IntegerType())
self.open_pivot_udf = udf(self.open_pivot, IntegerType())
self.click_pivot_udf = udf(self.click_pivot, IntegerType())
self.unsub_pivot_udf = udf(self.unsub_pivot, IntegerType())
self.send_raw_pivot_udf = udf(self.send_raw_pivot, IntegerType())
self.bounce_raw_pivot_udf = udf(self.bounce_raw_pivot, IntegerType())
self.open_raw_pivot_udf = udf(self.open_raw_pivot, IntegerType())
self.click_raw_pivot_udf = udf(self.click_raw_pivot, IntegerType())
self.unsub_raw_pivot_udf = udf(self.unsub_raw_pivot, IntegerType())
self.Site_Platform_udf = udf(self.Site_Platform, StringType())
self.SID_udf = udf(self.SID, StringType())
self.tpid_udf = udf(self.tpid, StringType())
self.total_unsubs_udf = udf(self.total_unsubs, IntegerType())
self.eapid_udf = udf(self.eapid, StringType())
self.curr_time_udf = udf(self.get_current_time, StringType())
self.sub_channel_udf = udf(self.sub_channel, StringType())
self.program_code_udf = udf(self.program_code, StringType())
self.campaign_code_udf = udf(self.campaign_code, StringType())
self.site_name_2_udf = udf(self.site_name_2, StringType())
self.send_date_func_udf = udf(self.send_date_func, StringType())
self.url_func = lambda x: re.sub('"', '', x)
self.url_func_udf = udf(self.url_func,StringType())
self.df_filter_udf = udf(self.df_filter, BooleanType())
def parse_input(self,input_path):
self.input_path=input_path.replace(' ','')
event_type = re.search('event_type=([^/]*)',self.input_path)
mg_load_date = re.search('mg_load_date=([^/]*)',self.input_path)
if event_type:
event_type=event_type.group(1)
if mg_load_date:
mg_load_date=mg_load_date.group(1)
return (event_type,mg_load_date)
def input(self):
self.email_fact = self.sqlContext.read.format(self.read_format).load(self.input_path,schema=self.schema)
def output(self):
self.email_fact.write.format(self.write_format).save(self.output_path)
def parse_omni_code(self,email_omni_code):
email_omni_code = email_omni_code.upper()
e_omni_code = ""
if email_omni_code and 'TEID'.lower() in email_omni_code.lower():
segm = re.findall("(.*)-SEGM",email_omni_code.upper())
if segm:
e_omni_code=segm[0]
else :
e_omni_code = re.search('(.*)&.*EMLDTL=.*', email_omni_code.upper())
if e_omni_code:
e_omni_code=e_omni_code.group(1)
else:
e_omni_code=re.search("(.*)[&\.;,].*DATE",email_omni_code)
e_omni_code = re.sub('EML\.EMLCID=|EML\.','',e_omni_code)
return e_omni_code
def parse_site_name(self,site_map):
temp=site_map.split(".",1)[0]
ans = temp.replace("-",".") if re.search('.*-.*',temp) else "EXPEDIA"+"."+re.sub("[^a-zA-Z0-9]","",site_map)
ans = re.sub('(?<!TRAVELOCITY.)US','COM',ans)
return ans
def parse_null(self,null_string):
if null_string.strip():
return null_string.strip()
def send_date_conv(self,email_omni_code,send_date):
if email_omni_code and 'TEID'.lower() in email_omni_code.lower():
email_omni_code1 = re.search("DATE((19|20)[0-9]{2}(0[1-9]|1[012])(0[1-9]|[12][0-9]|3[01]))", email_omni_code,
re.IGNORECASE)
conv_date = lambda x : x[:4] + '-' + x[4:6] + '-' + x[6:]
ifbreak = 1
if email_omni_code1:
null_string = self.parse_null(email_omni_code1.group(1))
if null_string:
email_omni_code1 = conv_date(email_omni_code1)
return email_omni_code1
send_date1 = re.search('(19|20)[0-9]{2}(0[1-9]|1[012])(0[1-9]|[12][0-9]|3[01])',send_date,re.IGNORECASE)
if send_date1:
if self.parse_null(send_date1.group(0)):
send_date1=conv_date(send_date1)
return send_date1
def Site_Platform(self,device):
if device:
device = device.strip().upper()
change = {'SMARTPHONE':'TWEB','TABLET':'PWEB','GAME CONSOLE':'DESKTOP','SMART TV':'DESKTOP',
'OTHER':'DESKTOP','PERSONAL COMPUTER':'DESKTOP'}
device = change[device] if device in change else 'N/A'
return device
def pivot(self,name,event1,event2=1,name2=None):
if not name2:
return 1 if event1 and name and event1.strip() == name.strip() and event2==1 else 0
return 1 if event1 and name and event1.strip() == name.strip() or name2 and event1 and event1.strip()==name2.strip() else 0
def send_raw_pivot(self,event1):
return 1 if event1.strip() == 'SEND' else 0
def bounce_raw_pivot(self,event1):
return 1 if event1.strip() == 'BOUNCE' else 0
def open_raw_pivot(self,event1):
return 1 if event1.strip() == 'OPEN' else 0
def click_raw_pivot(self,event1):
return 1 if event1.strip() == 'CLICK' else 0
def unsub_raw_pivot(self,event1):
return 1 if event1 and event1.strip() == 'UNSUB'.strip() or event1.strip() == 'COMPLAINT'.strip() else 0
def send_pivot(self,event1,event2):
return 1 if event1 and event2 and event1.strip() == 'SEND' and event2==1 else 0
def bounce_pivot(self,event1,event2):
return 1 if event1 and event2 and event1.strip() == 'BOUNCE' and event2==1 else 0
def open_pivot(self,event1,event2):
return 1 if event1 and event2 and event1.strip() == 'OPEN' and event2==1 else 0
def click_pivot(self,event1,event2):
return 1 if event1 and event2 and event1.strip() == 'CLICK' and event2==1 else 0
def unsub_pivot(self,event1,event2):
return 1 if event1 and event2 and event1.strip() == 'UNSUB' and event2==1 else 0
def SID(self,email_omni_code):
if email_omni_code:
sid = re.search('SID(.*?)([\-|\.]|$)',email_omni_code.strip().upper())
if sid:
return sid.group(1)
def tpid(self,email_omni_code,event_type):
if email_omni_code:
caps_email_omni_code = email_omni_code.upper()
if 'TEID' in caps_email_omni_code and event_type in map(str.upper,['UNSUB','COMPLIANT']):
search = re.search('^(?:(?!\1)([^-]*)(?:\-)){4}',caps_email_omni_code)
if search :
search = search.group(1)[4]
return search
def eapid(self,email_omni_code,event_type):
if email_omni_code:
caps_email_omni_code = email_omni_code.upper()
if 'TEID' in caps_email_omni_code and event_type in map(str.upper,['UNSUB','COMPLIANT']):
search = re.search('^(?:(?!\1)([^-]*)(?:\-)){4}',caps_email_omni_code)
if search:
return search.group(1)[-1]
def total_unsubs(self,event1,fai):
if event1 and ((event1.strip().upper() == 'UNSUB'.strip().upper() and fai ==1) or (event1.strip().upper() ==
'COMPLAINT'.strip().upper())):
return 1
return 0
def sub_channel(self,email_omni_code):
if email_omni_code:
caps_email_omni_code = self.parse_omni_code(email_omni_code.upper())
if 'TEID' in caps_email_omni_code:
search = re.search('^(?:(?!\1)([^-]*)(?:\-)){'+str(1)+'}',caps_email_omni_code)
if search:
return search.group(1)
else:
try:
return caps_email_omni_code.split('.')[1]
except:
pass
def program_code(self,email_omni_code):
if email_omni_code:
caps_email_omni_code = self.parse_omni_code(email_omni_code.upper())
if 'TEID' in caps_email_omni_code:
search = re.search('^(?:(?!\1)([^-]*)(?:\-)){'+str(2)+'}',caps_email_omni_code)
if search:
return search.group(1)
else:
try:
return caps_email_omni_code.split('.')[2]
except:
pass
def campaign_code(self,email_omni_code):
if email_omni_code:
caps_email_omni_code = self.parse_omni_code(email_omni_code.upper())
if 'TEID' in caps_email_omni_code:
search = re.search('^(?:(?!\1)([^-]*)(?:\-)){'+str(1)+'}',caps_email_omni_code)
if search:
return search.group(3)
else:
try:
return caps_email_omni_code.split('.')[3]
except:
pass
def get_current_time(self):
datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def site_name_2(self,email_omni_code):
if email_omni_code:
caps_email_omni_code = email_omni_code.upper()
if 'TEID' in caps_email_omni_code:
poc = self.parse_omni_code(caps_email_omni_code)
if poc.strip():
return self.parse_site_name(poc)
return None
def send_date_func(self,email_omni_code,send_date):
if email_omni_code and send_date:
email_omni_code,send_date = email_omni_code.strip().upper(),send_date.strip()
sd = self.send_date_conv(email_omni_code,send_date)
if sd:
return sd.strip()
return send_date
def df_filter(self,email_omni_code,event_type,mg_load_date,event_date,send_date):
if email_omni_code:
match = ['SEND','OPEN','BOUNCE','CLICK','UNSUB','COMPLAINT']
#self.cmp_date = '2018-01-01'
try:
ret = self.send_date_func(email_omni_code,send_date)
send_date = ret if ret else send_date
cmp_send_date = time.strptime(send_date,'%Y-%m-%d') if send_date else None
mg_load_date,self.cmp_date = time.strptime(mg_load_date,'%Y-%m-%d'),time.strptime(self.cmp_date,'%Y-%m-%d')
if event_type and event_type.strip().upper() in map(str.upper,match) and mg_load_date >= self.cmp_date and \
event_date >= self.cmp_date and (cmp_send_date==None or cmp_send_date >= self.cmp_date):
return True
except:
pass
return False
def filter_df(self):
email_fact = email_fact.withColumn('filterCol',self.df_filter_udf(email_fact['email_omni_code'],
email_fact['event_type'],email_fact['mg_load_date']
,email_fact['event_date'],email_fact['send_date']))
email_fact = email_fact.filter(email_fact['filterCol']==True)
def derived_columns(self):
email_fact = email_fact.withColumn('SID',self.SID_udf(email_fact['email_omni_code'])).withColumn('TPID',
self.eapid_udf(email_fact['email_omni_code'],email_fact['event_type']
)).withColumn('EAPID',self.eapid_udf(email_fact['email_omni_code'],
email_fact['event_type'])).withColumn('subchannel_name',
self.sub_channel_udf(email_fact['email_omni_code'])).withColumn('program_code',
self.program_code_udf(email_fact['email_omni_code'])).withColumn('campaign_code',
self.campaign_code_udf(email_fact['email_omni_code'])).withColumn('site_name_2',
self.site_name_2_udf(email_fact['email_omni_code'])).withColumn('email_omni_code_mini',
self.parse_omni_code_udf(email_fact['email_omni_code'])).withColumn('url_plain',
self.url_func_udf(email_fact['url'])).withColumn('send_date',self.send_date_func_udf(
email_fact['email_omni_code'],
email_fact['send_date']))
# Partition By
email_fact = email_fact.withColumn('first_activity_ind',F.row_number().over(
Window.partitionBy([email_fact['event_type'],email_fact['email_addr'],
email_fact['email_omni_code'],email_fact['request_id'],
email_fact['send_date']]).orderBy(
email_fact['event_date'])))
email_fact = email_fact.withColumn('Site_Platform',self.Site_Platform_udf(email_fact['device'])
).withColumn('TOTAL_DELIVERED',
self.send_pivot_udf(email_fact['event_type'],email_fact['first_activity_ind'])).withColumn('TOTAL_UNDELIVERED',
self.bounce_pivot_udf(email_fact['event_type'],email_fact['first_activity_ind'])).withColumn('TOTAL_HTML_VIEWS',
self.open_pivot_udf(email_fact['event_type'],email_fact['first_activity_ind'])).withColumn('TOTAL_LINK_CLICKS',
self.click_pivot_udf(email_fact['event_type'],email_fact['first_activity_ind'])).withColumn('TOTAL_UNSUBS',
self.total_unsubs_udf(email_fact['event_type'],email_fact['first_activity_ind'])).withColumn(
'total_delivered_raw',self.send_raw_pivot_udf(email_fact['EVENT_TYPE'])).withColumn('total_undelivered_raw',
self.bounce_raw_pivot_udf(email_fact['event_type'])).withColumn('total_html_views_raw',
self.open_raw_pivot_udf(email_fact['event_type'])).withColumn('total_link_clicks_raw',
self.click_raw_pivot_udf(email_fact['event_type'])).withColumn('total_unsubs_raw',
self.unsub_raw_pivot_udf(email_fact['event_type'])).withColumn('ETL_LOAD_DATETIME',self.curr_time_udf()
).withColumn('ETL_LOAD_REF',F.lit('Email_Fact_Response'))
def execute(self):
email_fact = self.email_fact
email_fact.show()
if self.parse_flag:
email_fact = email_fact.withColumn('event_type',self.event_type_udf).withColumn('mg_load_date'
,self.mg_load_date_udf)
self.derived_columns()
self.email_fact = email_fact.select(email_fact['email_addr'],email_fact['site_name_2'],
email_fact['SID'],email_fact['TPID'],
email_fact['EAPID'],email_fact['subchannel_name'],
email_fact['program_code'],
email_fact['campaign_code'],email_fact['email_omni_code_mini'],
email_fact['email_omni_code'],
email_fact['send_date'],email_fact['event_date'],
email_fact['site_platform'],'TOTAL_DELIVERED','TOTAL_UNDELIVERED',
'TOTAL_HTML_VIEWS','TOTAL_LINK_CLICKS',
'TOTAL_UNSUBS','total_delivered_raw','total_undelivered_raw','total_html_views_raw','total_link_clicks_raw',
'total_unsubs_raw','ETL_LOAD_DATETIME','ETL_LOAD_REF').groupBy('email_addr','site_name_2','SID',"TPID",
"EAPID","subchannel_name","program_code","campaign_code","email_omni_code_mini","email_omni_code",
"send_date","event_date","Site_Platform").agg(F.sum('TOTAL_DELIVERED').alias('TOTAL_DELIVERED')
,F.sum('TOTAL_UNDELIVERED').alias('TOTAL_UNDELIVERED'),F.sum('TOTAL_HTML_VIEWS').alias(
'TOTAL_HTML_VIEWS'),F.sum('TOTAL_LINK_CLICKS').alias('TOTAL_LINK_CLICKS'),F.sum(
'TOTAL_UNSUBS').alias('TOTAL_UNSUBS'),F.sum('total_delivered_raw').alias('total_delivered_raw'),
F.sum('total_undelivered_raw').alias('total_undelivered_raw'),F.sum('total_html_views_raw').alias(
'total_html_views_raw'),F.sum('total_link_clicks_raw').alias('total_link_clicks_raw'),F.sum(
'total_unsubs_raw').alias('total_unsubs_raw'))
input_path = '/Users/host/Downloads/full_result_122259325.csv'
output_path = '/Users/host/Downloads/New/param'
read_format = 'csv'
write_format = 'csv'
compare_date = '2018-01-01'
EmailFact(input_path,output_path,read_format,write_format,compare_date,parse_input=False)
Upvotes: 2
Views: 5464
Reputation: 3267
The problem lies in using udfs the way they should be. Similar to the way we cannot use keyword arguments in udfs we probably cannot use class methods, so we have to use static methods, I solved it by declaring the functions used by udf outside the class. For example the following function I've defined outside of the class then called it inside where we declare udfs. Using so the class instance is not passed inside while creating the udf. Also there were more problems faced while using @staticmethod
, which I avoided using finally.
def tpid(email_omni_code, event_type):
if email_omni_code:
caps_email_omni_code = email_omni_code.upper()
if 'TEID' in caps_email_omni_code and event_type in map(str.upper, ['UNSUB', 'COMPLIANT']):
search = re.search('^(?:(?!\1)([^-]*)(?:\-)){4}', caps_email_omni_code)
if search:
search = search.group(1)[4]
return search
self.tpid_udf = udf(tpid, StringType())
Upvotes: 3