Akshay Hazari
Akshay Hazari

Reputation: 3267

Error running pyspark job pickle.PicklingError: Could not serialize object: TypeError: 'JavaPackage' object is not callable

I am trying to submit a pyspark job locally.

When I use a simpler code without a class it runs perfectly fine. But when I try adding the code inside a class it doesn't

This version of the same code works.

https://pastebin.com/raw/uRydZN3J

I am trying to submit a pyspark job locally .

I've tried to run the job like so ./bin/spark-submit --master local[4] /Users/Host/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark_param.py 10

It gives an error at line where it tries to access the DataFrame

email_fact = email_fact.withColumn('SID',self.SID_udf(email_fact['email_omni_code'])).withColumn('TPID',

It shows the error as pickle.PicklingError: Could not serialize object: TypeError: 'JavaPackage' object is not callable

I might be doing some silly mistake, since I have just tried working on this. I've been through several other similar posts with the same kind of error but wasn't able to find one which was working for me.

Any help is appreciated.

import re,time
from pyspark.sql.types import *
from pyspark import SparkContext
import datetime
from pyspark.sql import SQLContext
from pyspark.sql.window import Window
from pyspark.sql import Row, functions as F
from pyspark.sql.functions import udf


class EmailFact(object):

    def __init__(self,input_path,output_path,read_format,write_format,compare_date,parse_input=True):
        self.sc = SparkContext.getOrCreate()
        self.sqlContext = SQLContext(self.sc)
        self.cmp_date = compare_date

        self.create_udfs()
        self.create_schema()

        self.input_path,self.output_path = input_path,output_path
        self.read_format,self.write_format = read_format,write_format

        self.parse_flag = parse_input
        self.input()
        if parse_input:
            self.event_type,self.mg_load_date = self.parse_input()
            self.event_type_udf = udf(lambda : self.event_type,StringType())
            self.mg_load_date_udf = udf(lambda : self.mg_load_date,StringType())

        self.execute()
        self.output()

    def create_schema(self):
        self.schema = StructType([StructField("email_addr",StringType(),True),StructField("tpid",StringType(),True),StructField("eapid",
             StringType(),True),StructField("send_date",StringType(),True),StructField("event_date",StringType(),True),\
             StructField("event_time",StringType(),True),StructField("subchannel_name",StringType(),True),\
             StructField("program_code",StringType(),True),StructField("campaign_code",StringType(),True),\
             StructField("sid",StringType(),True),StructField("key_id",StringType(),True),\
             StructField("paid_id",StringType(),True),StructField("email_domain",StringType(),True),\
             StructField("request_id",StringType(),True),StructField("email_omni_code",StringType(),True),\
             StructField("email_omni_code_mini",StringType(),True),StructField("activity_id",StringType(),True),\
             StructField("device",StringType(),True),StructField("browser",StringType(),True),\
             StructField("operating_system",StringType(),True),StructField("email_client",StringType(),True),\
             StructField("details",StringType(),True),StructField("correlationid",StringType(),True),\
             StructField("category",StringType(),True),StructField("categorycode",StringType(),True),\
             StructField("ipaddress",StringType(),True),StructField("messagesize",LongType(),True),\
             StructField("url",StringType(),True),StructField("cw_year",StringType(),True),\
             StructField("cw_month",StringType(),True),StructField("cw_day",StringType(),True),\
             StructField("cw_hour",StringType(),True),StructField("event_type",StringType(),True),
             StructField("mg_load_date", StringType(), True)])


    def create_udfs(self):
        self.parse_omni_code_udf = udf(self.parse_omni_code, StringType())
        self.parse_site_name_udf = udf(self.parse_site_name, StringType())
        self.send_date_conv_udf = udf(self.send_date_conv, StringType())
        self.parse_null_udf = udf(self.parse_null, StringType())
        self.pivot_udf = udf(self.pivot, IntegerType())
        self.send_pivot_udf = udf(self.send_pivot, IntegerType())
        self.bounce_pivot_udf = udf(self.bounce_pivot, IntegerType())
        self.open_pivot_udf = udf(self.open_pivot, IntegerType())
        self.click_pivot_udf = udf(self.click_pivot, IntegerType())
        self.unsub_pivot_udf = udf(self.unsub_pivot, IntegerType())
        self.send_raw_pivot_udf = udf(self.send_raw_pivot, IntegerType())
        self.bounce_raw_pivot_udf = udf(self.bounce_raw_pivot, IntegerType())
        self.open_raw_pivot_udf = udf(self.open_raw_pivot, IntegerType())
        self.click_raw_pivot_udf = udf(self.click_raw_pivot, IntegerType())
        self.unsub_raw_pivot_udf = udf(self.unsub_raw_pivot, IntegerType())
        self.Site_Platform_udf = udf(self.Site_Platform, StringType())
        self.SID_udf = udf(self.SID, StringType())
        self.tpid_udf = udf(self.tpid, StringType())
        self.total_unsubs_udf = udf(self.total_unsubs, IntegerType())
        self.eapid_udf = udf(self.eapid, StringType())
        self.curr_time_udf = udf(self.get_current_time, StringType())
        self.sub_channel_udf = udf(self.sub_channel, StringType())
        self.program_code_udf = udf(self.program_code, StringType())
        self.campaign_code_udf = udf(self.campaign_code, StringType())
        self.site_name_2_udf = udf(self.site_name_2, StringType())
        self.send_date_func_udf = udf(self.send_date_func, StringType())
        self.url_func = lambda x: re.sub('"', '', x)
        self.url_func_udf = udf(self.url_func,StringType())
        self.df_filter_udf = udf(self.df_filter, BooleanType())

    def parse_input(self,input_path):
        self.input_path=input_path.replace(' ','')
        event_type = re.search('event_type=([^/]*)',self.input_path)
        mg_load_date = re.search('mg_load_date=([^/]*)',self.input_path)
        if event_type:
            event_type=event_type.group(1)
        if mg_load_date:
            mg_load_date=mg_load_date.group(1)
        return (event_type,mg_load_date)

    def input(self):
        self.email_fact = self.sqlContext.read.format(self.read_format).load(self.input_path,schema=self.schema)

    def output(self):
        self.email_fact.write.format(self.write_format).save(self.output_path)

    def parse_omni_code(self,email_omni_code):
        email_omni_code = email_omni_code.upper()
        e_omni_code = ""
        if email_omni_code and 'TEID'.lower() in email_omni_code.lower():
            segm = re.findall("(.*)-SEGM",email_omni_code.upper())
            if segm:
                e_omni_code=segm[0]
        else :
            e_omni_code = re.search('(.*)&.*EMLDTL=.*', email_omni_code.upper())
            if e_omni_code:
                e_omni_code=e_omni_code.group(1)
            else:
                e_omni_code=re.search("(.*)[&\.;,].*DATE",email_omni_code)
        e_omni_code = re.sub('EML\.EMLCID=|EML\.','',e_omni_code)
        return e_omni_code



    def parse_site_name(self,site_map):
        temp=site_map.split(".",1)[0]
        ans = temp.replace("-",".") if re.search('.*-.*',temp) else "EXPEDIA"+"."+re.sub("[^a-zA-Z0-9]","",site_map)
        ans = re.sub('(?<!TRAVELOCITY.)US','COM',ans)
        return ans

    def parse_null(self,null_string):
        if null_string.strip():
            return null_string.strip()

    def send_date_conv(self,email_omni_code,send_date):
        if email_omni_code and 'TEID'.lower() in email_omni_code.lower():
            email_omni_code1 = re.search("DATE((19|20)[0-9]{2}(0[1-9]|1[012])(0[1-9]|[12][0-9]|3[01]))", email_omni_code,
                                        re.IGNORECASE)
            conv_date = lambda x : x[:4] + '-' + x[4:6] + '-' + x[6:]
            ifbreak = 1
            if email_omni_code1:
                null_string = self.parse_null(email_omni_code1.group(1))
                if null_string:
                    email_omni_code1 = conv_date(email_omni_code1)
                    return email_omni_code1
            send_date1 = re.search('(19|20)[0-9]{2}(0[1-9]|1[012])(0[1-9]|[12][0-9]|3[01])',send_date,re.IGNORECASE)
            if send_date1:
                if self.parse_null(send_date1.group(0)):
                    send_date1=conv_date(send_date1)
                    return send_date1

    def Site_Platform(self,device):
        if device:
            device = device.strip().upper()
            change = {'SMARTPHONE':'TWEB','TABLET':'PWEB','GAME CONSOLE':'DESKTOP','SMART TV':'DESKTOP',
                      'OTHER':'DESKTOP','PERSONAL COMPUTER':'DESKTOP'}
            device = change[device] if device in change else 'N/A'
            return device


    def pivot(self,name,event1,event2=1,name2=None):
        if not name2:
            return 1 if event1 and name and event1.strip() == name.strip() and event2==1 else 0
        return 1 if event1 and name and event1.strip() == name.strip() or name2 and event1 and event1.strip()==name2.strip() else 0

    def send_raw_pivot(self,event1):
        return 1 if event1.strip() == 'SEND' else 0

    def bounce_raw_pivot(self,event1):
        return 1 if event1.strip() == 'BOUNCE'  else 0

    def open_raw_pivot(self,event1):
        return 1 if event1.strip() == 'OPEN' else 0

    def click_raw_pivot(self,event1):
        return 1 if event1.strip() == 'CLICK' else 0

    def unsub_raw_pivot(self,event1):
        return 1 if event1 and event1.strip() == 'UNSUB'.strip() or event1.strip() == 'COMPLAINT'.strip() else 0

    def send_pivot(self,event1,event2):
        return 1 if event1 and event2 and event1.strip() == 'SEND' and event2==1 else 0

    def bounce_pivot(self,event1,event2):
        return 1 if event1 and event2 and event1.strip() == 'BOUNCE' and event2==1 else 0

    def open_pivot(self,event1,event2):
        return 1 if event1 and event2 and event1.strip() == 'OPEN' and event2==1 else 0

    def click_pivot(self,event1,event2):
        return 1 if event1 and event2 and event1.strip() == 'CLICK' and event2==1 else 0

    def unsub_pivot(self,event1,event2):
        return 1 if event1 and event2 and event1.strip() == 'UNSUB' and event2==1 else 0


    def SID(self,email_omni_code):
        if email_omni_code:
            sid = re.search('SID(.*?)([\-|\.]|$)',email_omni_code.strip().upper())
            if sid:
                return sid.group(1)


    def tpid(self,email_omni_code,event_type):
        if email_omni_code:
            caps_email_omni_code = email_omni_code.upper()
            if 'TEID' in caps_email_omni_code and event_type in map(str.upper,['UNSUB','COMPLIANT']):
                search = re.search('^(?:(?!\1)([^-]*)(?:\-)){4}',caps_email_omni_code)
                if search :
                    search = search.group(1)[4]
                    return search

    def eapid(self,email_omni_code,event_type):
        if email_omni_code:
            caps_email_omni_code = email_omni_code.upper()
            if 'TEID' in caps_email_omni_code and event_type in map(str.upper,['UNSUB','COMPLIANT']):
                search = re.search('^(?:(?!\1)([^-]*)(?:\-)){4}',caps_email_omni_code)
                if search:
                    return search.group(1)[-1]

    def total_unsubs(self,event1,fai):
        if event1 and ((event1.strip().upper() == 'UNSUB'.strip().upper() and fai ==1) or (event1.strip().upper() ==
                                                                                           'COMPLAINT'.strip().upper())):
            return 1
        return 0


    def sub_channel(self,email_omni_code):
        if email_omni_code:
            caps_email_omni_code = self.parse_omni_code(email_omni_code.upper())
            if 'TEID' in caps_email_omni_code:
                search = re.search('^(?:(?!\1)([^-]*)(?:\-)){'+str(1)+'}',caps_email_omni_code)
                if search:
                    return search.group(1)
            else:
                try:
                    return caps_email_omni_code.split('.')[1]
                except:
                    pass


    def program_code(self,email_omni_code):
        if email_omni_code:
            caps_email_omni_code = self.parse_omni_code(email_omni_code.upper())
            if 'TEID' in caps_email_omni_code:
                search = re.search('^(?:(?!\1)([^-]*)(?:\-)){'+str(2)+'}',caps_email_omni_code)
                if search:
                    return search.group(1)
            else:
                try:
                    return caps_email_omni_code.split('.')[2]
                except:
                    pass


    def campaign_code(self,email_omni_code):
        if email_omni_code:
            caps_email_omni_code = self.parse_omni_code(email_omni_code.upper())
            if 'TEID' in caps_email_omni_code:
                search = re.search('^(?:(?!\1)([^-]*)(?:\-)){'+str(1)+'}',caps_email_omni_code)
                if search:
                    return search.group(3)
            else:
                try:
                    return caps_email_omni_code.split('.')[3]
                except:
                    pass


    def get_current_time(self):
        datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    def site_name_2(self,email_omni_code):
        if email_omni_code:
            caps_email_omni_code = email_omni_code.upper()
            if 'TEID' in caps_email_omni_code:
                poc = self.parse_omni_code(caps_email_omni_code)
                if poc.strip():
                    return self.parse_site_name(poc)
            return None


    def send_date_func(self,email_omni_code,send_date):
        if email_omni_code and send_date:
            email_omni_code,send_date = email_omni_code.strip().upper(),send_date.strip()
            sd = self.send_date_conv(email_omni_code,send_date)
            if sd:
                return sd.strip()
            return send_date


    def df_filter(self,email_omni_code,event_type,mg_load_date,event_date,send_date):
        if email_omni_code:
            match = ['SEND','OPEN','BOUNCE','CLICK','UNSUB','COMPLAINT']
            #self.cmp_date = '2018-01-01'
            try:
                ret = self.send_date_func(email_omni_code,send_date)
                send_date = ret if ret else send_date
                cmp_send_date  = time.strptime(send_date,'%Y-%m-%d') if send_date else None
                mg_load_date,self.cmp_date = time.strptime(mg_load_date,'%Y-%m-%d'),time.strptime(self.cmp_date,'%Y-%m-%d')
                if event_type and event_type.strip().upper() in map(str.upper,match) and mg_load_date >= self.cmp_date and \
                    event_date >= self.cmp_date and (cmp_send_date==None or cmp_send_date >= self.cmp_date):
                    return True
            except:
                pass
        return False

    def filter_df(self):
        email_fact = email_fact.withColumn('filterCol',self.df_filter_udf(email_fact['email_omni_code'],
                                                    email_fact['event_type'],email_fact['mg_load_date']
                                                    ,email_fact['event_date'],email_fact['send_date']))

        email_fact = email_fact.filter(email_fact['filterCol']==True)

    def derived_columns(self):
        email_fact = email_fact.withColumn('SID',self.SID_udf(email_fact['email_omni_code'])).withColumn('TPID',
                    self.eapid_udf(email_fact['email_omni_code'],email_fact['event_type']
                    )).withColumn('EAPID',self.eapid_udf(email_fact['email_omni_code'],
                    email_fact['event_type'])).withColumn('subchannel_name',
                    self.sub_channel_udf(email_fact['email_omni_code'])).withColumn('program_code',
                    self.program_code_udf(email_fact['email_omni_code'])).withColumn('campaign_code',
                    self.campaign_code_udf(email_fact['email_omni_code'])).withColumn('site_name_2',
                    self.site_name_2_udf(email_fact['email_omni_code'])).withColumn('email_omni_code_mini',
                    self.parse_omni_code_udf(email_fact['email_omni_code'])).withColumn('url_plain',
                    self.url_func_udf(email_fact['url'])).withColumn('send_date',self.send_date_func_udf(
                    email_fact['email_omni_code'],
                    email_fact['send_date']))

        # Partition By
        email_fact = email_fact.withColumn('first_activity_ind',F.row_number().over(
                    Window.partitionBy([email_fact['event_type'],email_fact['email_addr'],
                                        email_fact['email_omni_code'],email_fact['request_id'],
                                        email_fact['send_date']]).orderBy(
                        email_fact['event_date'])))

        email_fact = email_fact.withColumn('Site_Platform',self.Site_Platform_udf(email_fact['device'])
                                                     ).withColumn('TOTAL_DELIVERED',
                    self.send_pivot_udf(email_fact['event_type'],email_fact['first_activity_ind'])).withColumn('TOTAL_UNDELIVERED',
                    self.bounce_pivot_udf(email_fact['event_type'],email_fact['first_activity_ind'])).withColumn('TOTAL_HTML_VIEWS',
                    self.open_pivot_udf(email_fact['event_type'],email_fact['first_activity_ind'])).withColumn('TOTAL_LINK_CLICKS',
                    self.click_pivot_udf(email_fact['event_type'],email_fact['first_activity_ind'])).withColumn('TOTAL_UNSUBS',
                    self.total_unsubs_udf(email_fact['event_type'],email_fact['first_activity_ind'])).withColumn(
                    'total_delivered_raw',self.send_raw_pivot_udf(email_fact['EVENT_TYPE'])).withColumn('total_undelivered_raw',
                    self.bounce_raw_pivot_udf(email_fact['event_type'])).withColumn('total_html_views_raw',
                    self.open_raw_pivot_udf(email_fact['event_type'])).withColumn('total_link_clicks_raw',
                    self.click_raw_pivot_udf(email_fact['event_type'])).withColumn('total_unsubs_raw',
                    self.unsub_raw_pivot_udf(email_fact['event_type'])).withColumn('ETL_LOAD_DATETIME',self.curr_time_udf()
                    ).withColumn('ETL_LOAD_REF',F.lit('Email_Fact_Response'))


    def execute(self):
        email_fact = self.email_fact
        email_fact.show()
        if self.parse_flag:
            email_fact = email_fact.withColumn('event_type',self.event_type_udf).withColumn('mg_load_date'
                                                                                                ,self.mg_load_date_udf)

        self.derived_columns()
        self.email_fact = email_fact.select(email_fact['email_addr'],email_fact['site_name_2'],
                                            email_fact['SID'],email_fact['TPID'],
                                            email_fact['EAPID'],email_fact['subchannel_name'],
                                            email_fact['program_code'],
                                            email_fact['campaign_code'],email_fact['email_omni_code_mini'],
                                            email_fact['email_omni_code'],
                                            email_fact['send_date'],email_fact['event_date'],
                                            email_fact['site_platform'],'TOTAL_DELIVERED','TOTAL_UNDELIVERED',
                                            'TOTAL_HTML_VIEWS','TOTAL_LINK_CLICKS',
                     'TOTAL_UNSUBS','total_delivered_raw','total_undelivered_raw','total_html_views_raw','total_link_clicks_raw',
                     'total_unsubs_raw','ETL_LOAD_DATETIME','ETL_LOAD_REF').groupBy('email_addr','site_name_2','SID',"TPID",
                     "EAPID","subchannel_name","program_code","campaign_code","email_omni_code_mini","email_omni_code",
                     "send_date","event_date","Site_Platform").agg(F.sum('TOTAL_DELIVERED').alias('TOTAL_DELIVERED')
                    ,F.sum('TOTAL_UNDELIVERED').alias('TOTAL_UNDELIVERED'),F.sum('TOTAL_HTML_VIEWS').alias(
                    'TOTAL_HTML_VIEWS'),F.sum('TOTAL_LINK_CLICKS').alias('TOTAL_LINK_CLICKS'),F.sum(
                    'TOTAL_UNSUBS').alias('TOTAL_UNSUBS'),F.sum('total_delivered_raw').alias('total_delivered_raw'),
                    F.sum('total_undelivered_raw').alias('total_undelivered_raw'),F.sum('total_html_views_raw').alias(
                   'total_html_views_raw'),F.sum('total_link_clicks_raw').alias('total_link_clicks_raw'),F.sum(
                   'total_unsubs_raw').alias('total_unsubs_raw'))


input_path = '/Users/host/Downloads/full_result_122259325.csv'
output_path = '/Users/host/Downloads/New/param'
read_format = 'csv'
write_format = 'csv'
compare_date = '2018-01-01'

EmailFact(input_path,output_path,read_format,write_format,compare_date,parse_input=False)

Upvotes: 2

Views: 5464

Answers (1)

Akshay Hazari
Akshay Hazari

Reputation: 3267

The problem lies in using udfs the way they should be. Similar to the way we cannot use keyword arguments in udfs we probably cannot use class methods, so we have to use static methods, I solved it by declaring the functions used by udf outside the class. For example the following function I've defined outside of the class then called it inside where we declare udfs. Using so the class instance is not passed inside while creating the udf. Also there were more problems faced while using @staticmethod, which I avoided using finally.

def tpid(email_omni_code, event_type):
    if email_omni_code:
        caps_email_omni_code = email_omni_code.upper()
        if 'TEID' in caps_email_omni_code and event_type in map(str.upper, ['UNSUB', 'COMPLIANT']):
            search = re.search('^(?:(?!\1)([^-]*)(?:\-)){4}', caps_email_omni_code)
            if search:
                search = search.group(1)[4]
                return search

self.tpid_udf = udf(tpid, StringType())

Upvotes: 3

Related Questions