Reputation: 591
I have a simple UDF in databricks used in spark. I can't use println or log4j or something because it will get outputted to the execution, I need it in the driver. I have a very system log setup
var logMessage = ""
def log(msg: String){
logMessage += msg + "\n"
}
def writeLog(file: String){
println("start write")
println(logMessage)
println("end write")
}
def warning(msg: String){
log("*WARNING* " + msg)
}
val CleanText = (s: int) => {
log("I am in this UDF")
s+2
}
sqlContext.udf.register("CleanText", CleanText)
How can I get this to function properly and log to driver?
Upvotes: 6
Views: 8243
Reputation: 37832
The closest mechanism in Apache Spark to what you're trying to do is accumulators. You can accumulate the log lines on the executors and access the result in driver:
// create a collection accumulator using the spark context:
val logLines: CollectionAccumulator[String] = sc.collectionAccumulator("log")
// log function adds a line to accumulator
def log(msg: String): Unit = logLines.add(msg)
// driver-side function can print the log using accumulator's *value*
def writeLog() {
import scala.collection.JavaConverters._
println("start write")
logLines.value.asScala.foreach(println)
println("end write")
}
val CleanText = udf((s: Int) => {
log(s"I am in this UDF, got: $s")
s+2
})
// use UDF in some transformation:
Seq(1, 2).toDF("a").select(CleanText($"a")).show()
writeLog()
// prints:
// start write
// I am in this UDF, got: 1
// I am in this UDF, got: 2
// end write
BUT: this isn't really recommended, especially not for logging purposes. If you log on every record, this accumulator would eventually crash your driver on OutOfMemoryError
or just slow you down horribly.
Since you're using Databricks, I would check what options they support for log aggregation, or simply use the Spark UI to view the executor logs.
Upvotes: 5
Reputation: 39294
You can't... unless you want to go crazy and make some sort of log-back appender that sends logs over the network or something like that.
The code for the UDF will be run on all your executors when you evaluate a data frame. So, you might have 2000 hosts running it and each of them will log to their own location; that's how Spark works. The driver isn't the one running the code so it can't be logged to.
You can use YARN log aggregate to pull all the logs from the executors though for later analysis.
You could probably also write to a kafka stream or something creative like that with some work and write the logs contiguously later off the stream.
Upvotes: 1