Mithril
Mithril

Reputation: 13768

How to log/print message in pyspark pandas_udf?

I have tested that both logger and print can't print message in a pandas_udf , either in cluster mode or client mode.

Test code:

import sys
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import logging

logger = logging.getLogger('test')

spark = (SparkSession
.builder
.appName('test')
.getOrCreate())


df = spark.createDataFrame(pd.DataFrame({
    'y': np.random.randint(1, 10, (20,)),
    'ds': np.random.randint(1000, 9999, (20,)),
    'store_id' : ['a'] * 10 + ['b'] *7 + ['q']*3,
    'product_id' : ['c'] * 5 + ['d'] *12 + ['e']*3,
    })
)


@pandas_udf('y int, ds int, store_id string, product_id string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    print('#'*100)
    logger.info('$'*100)
    logger.error('&'*100)
    return pd.DataFrame([], columns=['y', 'ds','store_id','product_id'])


df1 = df.groupby(['store_id', 'product_id']).apply(train_predict)

Also note:

log4jLogger = spark.sparkContext._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("#"*50)

You can't use this in pandas_udf, because this log beyond to spark context object, you can't refer to spark session/context in a udf.

The only way I know is use Excetion as the answer I wrote below. But it is tricky and with drawback. I want to know if there is any way to just print message in pandas_udf.

Upvotes: 13

Views: 9925

Answers (4)

Skelly81
Skelly81

Reputation: 11

A workaround that can avoid a dependence on altering the return DataFrame, or without even requiring that your function reaches the return call, is to specify a log file within the pandas_udf function which collects calls to the logging module. For example, the logging call below will write the output "INFO:root:$$$$$$$$$$" 4 times to log_file.log in the current directory, once an operation on df1 is executed, i.e. df1.show() or df1.collect()

@pandas_udf('y int, ds int, store_id string, product_id string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    logging.basicConfig(filename='log_file.log', encoding='utf-8', level=logging.INFO)
    logging.info('$'*10)
    return pd.DataFrame([], columns=['y', 'ds','store_id','product_id'])

Upvotes: 0

minhle_r7
minhle_r7

Reputation: 872

It's not true that you can't print. When I print() in my UDF the messages show up in the stderr of the Spark task. The trick is to make sure that you look at the right place. Below is an example from an application I'm working on now.

messages from print() shows up in Spark task stderr

Upvotes: 2

Mithril
Mithril

Reputation: 13768

Currently, I tried every way in spark 2.4 .

Without log, it is hard to debug a faulty pandas_udf. The only workable way I know can print error messgage in pandas_udf is raise Exception . So it really cost time to debug in this way, but there isn't a better way I know .

@pandas_udf('y int, ds int, store_id string, product_id string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    print('#'*100)
    logger.info('$'*100)
    logger.error('&'*100)
    raise Exception('@'*100)  # The only way I know can print message but would break execution 
    return pd.DataFrame([], columns=['y', 'ds','store_id','product_id'])

The drawback is you can't keep spark running after print message.

Upvotes: 3

niuer
niuer

Reputation: 1669

One thing you can do is to put the log message into the DataFrame itself. For example

@pandas_udf('y int, ds int, store_id string, product_id string, log string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    return pd.DataFrame([3, 5, 'store123', 'product123', 'My log message'], columns=['y', 'ds','store_id','product_id', 'log'])

After this, you can select the log column with related information into another DataFrame and output to file. Drop it from the original DataFrame.

It's not perfect, but it might be helpful.

Upvotes: 3

Related Questions