Reputation: 13768
I have tested that both logger
and print
can't print message in a pandas_udf
, either in cluster mode or client mode.
Test code:
import sys
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import logging
logger = logging.getLogger('test')
spark = (SparkSession
.builder
.appName('test')
.getOrCreate())
df = spark.createDataFrame(pd.DataFrame({
'y': np.random.randint(1, 10, (20,)),
'ds': np.random.randint(1000, 9999, (20,)),
'store_id' : ['a'] * 10 + ['b'] *7 + ['q']*3,
'product_id' : ['c'] * 5 + ['d'] *12 + ['e']*3,
})
)
@pandas_udf('y int, ds int, store_id string, product_id string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
print('#'*100)
logger.info('$'*100)
logger.error('&'*100)
return pd.DataFrame([], columns=['y', 'ds','store_id','product_id'])
df1 = df.groupby(['store_id', 'product_id']).apply(train_predict)
Also note:
log4jLogger = spark.sparkContext._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("#"*50)
You can't use this in pandas_udf
, because this log beyond to spark context object, you can't refer to spark session/context in a udf.
The only way I know is use Excetion
as the answer I wrote below.
But it is tricky and with drawback.
I want to know if there is any way to just print message in pandas_udf.
Upvotes: 13
Views: 9925
Reputation: 11
A workaround that can avoid a dependence on altering the return DataFrame, or without even requiring that your function reaches the return call, is to specify a log file within the pandas_udf function which collects calls to the logging module. For example, the logging call below will write the output "INFO:root:$$$$$$$$$$" 4 times to log_file.log in the current directory, once an operation on df1 is executed, i.e. df1.show() or df1.collect()
@pandas_udf('y int, ds int, store_id string, product_id string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
logging.basicConfig(filename='log_file.log', encoding='utf-8', level=logging.INFO)
logging.info('$'*10)
return pd.DataFrame([], columns=['y', 'ds','store_id','product_id'])
Upvotes: 0
Reputation: 872
It's not true that you can't print. When I print()
in my UDF the messages show up in the stderr of the Spark task. The trick is to make sure that you look at the right place. Below is an example from an application I'm working on now.
Upvotes: 2
Reputation: 13768
Currently, I tried every way in spark 2.4 .
Without log, it is hard to debug a faulty pandas_udf. The only workable way I know can print error messgage in pandas_udf is raise Exception
. So it really cost time to debug in this way, but there isn't a better way I know .
@pandas_udf('y int, ds int, store_id string, product_id string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
print('#'*100)
logger.info('$'*100)
logger.error('&'*100)
raise Exception('@'*100) # The only way I know can print message but would break execution
return pd.DataFrame([], columns=['y', 'ds','store_id','product_id'])
The drawback is you can't keep spark running after print message.
Upvotes: 3
Reputation: 1669
One thing you can do is to put the log message into the DataFrame itself. For example
@pandas_udf('y int, ds int, store_id string, product_id string, log string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
return pd.DataFrame([3, 5, 'store123', 'product123', 'My log message'], columns=['y', 'ds','store_id','product_id', 'log'])
After this, you can select the log column with related information into another DataFrame and output to file. Drop it from the original DataFrame.
It's not perfect, but it might be helpful.
Upvotes: 3