Reputation: 2017
I have multiple jobs that I want to execute in parallel that append daily data into the same path using partitioning.
e.g.
dataFrame.write().
partitionBy("eventDate", "category")
.mode(Append)
.parquet("s3://bucket/save/path");
Job 1 - category = "billing_events" Job 2 - category = "click_events"
Both of these jobs will truncate any existing partitions that exist in the s3 bucket prior to execution and then save the resulting parquet files to their respective partitions.
i.e.
job 1 - > s3://bucket/save/path/eventDate=20160101/channel=billing_events
job 2 - > s3://bucket/save/path/eventDate=20160101/channel=click_events
The problem im facing is the temporary files that get created during the job execution by spark. It saves the working out files to the base path
s3://bucket/save/path/_temporary/...
so both jobs end up sharing the same temp folder and cause conflict, which ive noticed can cause one job to delete temp files, and the other job fail with a 404 from s3 saying an expected temp file doesnt exist.
Has anyone faced this issue and come up with a strategy to have parallel execution of jobs in the same base path?
im using spark 1.6.0 for now
Upvotes: 31
Views: 25416
Reputation: 41
Instead of using partitionBy
dataFrame.write().
partitionBy("eventDate", "category")
.mode(Append)
.parquet("s3://bucket/save/path");
Alternatively you can write the files as
In job-1 specify the parquet file path as :
dataFrame.write().mode(Append)
.parquet("s3://bucket/save/path/eventDate=20160101/channel=billing_events")
& in job-2 specify the parquet file path as :
dataFrame.write().mode(Append)
.parquet("s3://bucket/save/path/eventDate=20160101/channel=click_events")
Upvotes: 4
Reputation: 1
Multiple write tasks for same path with "partitionBy", will FAILED when _temporary
been delete in cleanupJob
of FileOutputCommitter
, like No such file or directory
.
TEST CODE:
def batchTask[A](TASK_tag: String, taskData: TraversableOnce[A], batchSize: Int, fTask: A => Unit, fTaskId: A => String): Unit = {
var list = new scala.collection.mutable.ArrayBuffer[(String, java.util.concurrent.Future[Int])]()
val executors = java.util.concurrent.Executors.newFixedThreadPool(batchSize)
try {
taskData.foreach(d => {
val task = executors.submit(new java.util.concurrent.Callable[Int] {
override def call(): Int = {
fTask(d)
1
}
})
list += ((fTaskId(d), task))
})
var count = 0
list.foreach(r => if (!r._2.isCancelled) count += r._2.get())
} finally {
executors.shutdown()
}
}
def testWriteFail(outPath: String)(implicit spark: SparkSession, sc: SparkContext): Unit = {
println(s"try save: ${outPath}")
import org.apache.spark.sql.functions._
import spark.sqlContext.implicits._
batchTask[Int]("test", 1 to 20, 6, t => {
val df1 =
Seq((1, "First Value", java.sql.Date.valueOf("2010-01-01")), (2, "Second Value", java.sql.Date.valueOf("2010-02-01")))
.toDF("int_column", "string_column", "date_column")
.withColumn("t0", lit(t))
df1.repartition(1).write
.mode("overwrite")
.option("mapreduce.fileoutputcommitter.marksuccessfuljobs", false)
.partitionBy("t0").csv(outPath)
}, t => f"task.${t}%4d") // some Exception
println(s"fail: count=${spark.read.csv(outPath).count()}")
}
try {
testWriteFail(outPath + "/fail")
} catch {
case e: Throwable =>
}
Failed
Use OutputCommitter
:
package org.jar.spark.util
import java.io.IOException
/*
* 用于 DataFrame 多任务写入同一个目录。
* <pre>
* 1. 基于临时目录写入
* 2. 如果【任务的输出】可能会有重叠,不要使用 overwrite 方式,以免误删除
* </pre>
* <p/>
* Created by liao on 2018-12-02.
*/
object JMultiWrite {
val JAR_Write_Cache_Flag = "jar.write.cache.flag"
val JAR_Write_Cache_TaskId = "jar.write.cache.taskId"
/** 自动删除目标目录下同名子目录 */
val JAR_Write_Cache_Overwrite = "jar.write.cache.overwrite"
implicit class ImplicitWrite[T](dw: org.apache.spark.sql.DataFrameWriter[T]) {
/**
* 输出到文件,需要在外面配置 option format mode 等
*
* @param outDir 输出目标目录
* @param taskId 此次任务ID,用于隔离各任务的输出,必须具有唯一性
* @param cacheDir 缓存目录,最好是 '_' 开头的目录,如 "_jarTaskCache"
* @param overwrite 是否删除已经存在的目录,默认 false 表示 Append模式
* <font color=red>(如果 并行任务可能有相同 子目录输出时,会冲掉,此时不要使用 overwrite)</font>
*/
def multiWrite(outDir: String, taskId: String, cacheDir: String = "_jarTaskCache", overwrite: Boolean = false): Boolean = {
val p = path(outDir, cacheDir, taskId)
dw.options(options(cacheDir, taskId))
.option(JAR_Write_Cache_Overwrite, overwrite)
.mode(org.apache.spark.sql.SaveMode.Overwrite)
.save(p)
true
}
}
def options(cacheDir: String, taskId: String): Map[String, String] = {
Map(JAR_Write_Cache_Flag -> cacheDir,
JAR_Write_Cache_TaskId -> taskId,
"mapreduce.fileoutputcommitter.marksuccessfuljobs" -> "false",
"mapreduce.job.outputformat.class" -> classOf[JarOutputFormat].getName
)
}
def path(outDir: String, cacheDir: String, taskId: String): String = {
assert(outDir != "", "need OutDir")
assert(cacheDir != "", "need CacheDir")
assert(taskId != "", "needTaskId")
outDir + "/" + cacheDir + "/" + taskId
}
/*-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-*/
class JarOutputFormat extends org.apache.hadoop.mapreduce.lib.output.TextOutputFormat {
var committer: org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter = _
override def getOutputCommitter(context: org.apache.hadoop.mapreduce.TaskAttemptContext): org.apache.hadoop.mapreduce.OutputCommitter = {
if (this.committer == null) {
val output = org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputPath(context)
this.committer = new JarOutputCommitter(output, context)
}
this.committer
}
}
class JarOutputCommitter(output: org.apache.hadoop.fs.Path, context: org.apache.hadoop.mapreduce.TaskAttemptContext)
extends org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter(output, context) {
override def commitJob(context: org.apache.hadoop.mapreduce.JobContext): Unit = {
val finalOutput = this.output
val cacheFlag = context.getConfiguration.get(JAR_Write_Cache_Flag, "")
val myTaskId = context.getConfiguration.get(JAR_Write_Cache_TaskId, "")
val overwrite = context.getConfiguration.getBoolean(JAR_Write_Cache_Overwrite, false)
val hasCacheFlag = finalOutput.getName == myTaskId && finalOutput.getParent.getName == cacheFlag
val finalReal = if (hasCacheFlag) finalOutput.getParent.getParent else finalOutput // 确定最终目录
// 遍历输出目录
val fs = finalOutput.getFileSystem(context.getConfiguration)
val jobAttemptPath = getJobAttemptPath(context)
val arr$ = fs.listStatus(jobAttemptPath, new org.apache.hadoop.fs.PathFilter {
override def accept(path: org.apache.hadoop.fs.Path): Boolean = !"_temporary".equals(path.getName())
})
if (hasCacheFlag && overwrite) // 移除同名子目录
{
if (fs.isDirectory(finalReal)) arr$.foreach(stat =>
if (fs.isDirectory(stat.getPath)) fs.listStatus(stat.getPath).foreach(stat2 => {
val p1 = stat2.getPath
val p2 = new org.apache.hadoop.fs.Path(finalReal, p1.getName)
if (fs.isDirectory(p1) && fs.isDirectory(p2) && !fs.delete(p2, true)) throw new IOException("Failed to delete " + p2)
})
)
}
arr$.foreach(stat => {
mergePaths(fs, stat, finalReal)
})
cleanupJob(context)
if (hasCacheFlag) { // 移除缓存目录
try {
fs.delete(finalOutput, false)
val pp = finalOutput.getParent
if (fs.listStatus(pp).isEmpty)
fs.delete(pp, false)
} catch {
case e: Exception =>
}
}
// 不用输出 _SUCCESS 了
//if (context.getConfiguration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) {
// val markerPath = new org.apache.hadoop.fs.Path(this.outputPath, "_SUCCESS")
// fs.create(markerPath).close()
//}
}
}
@throws[IOException]
def mergePaths(fs: org.apache.hadoop.fs.FileSystem, from: org.apache.hadoop.fs.FileStatus, to: org.apache.hadoop.fs.Path): Unit = {
if (from.isFile) {
if (fs.exists(to) && !fs.delete(to, true)) throw new IOException("Failed to delete " + to)
if (!fs.rename(from.getPath, to)) throw new IOException("Failed to rename " + from + " to " + to)
}
else if (from.isDirectory) if (fs.exists(to)) {
val toStat = fs.getFileStatus(to)
if (!toStat.isDirectory) {
if (!fs.delete(to, true)) throw new IOException("Failed to delete " + to)
if (!fs.rename(from.getPath, to)) throw new IOException("Failed to rename " + from + " to " + to)
}
else {
val arr$ = fs.listStatus(from.getPath)
for (subFrom <- arr$) {
mergePaths(fs, subFrom, new org.apache.hadoop.fs.Path(to, subFrom.getPath.getName))
}
}
}
else if (!fs.rename(from.getPath, to)) throw new IOException("Failed to rename " + from + " to " + to)
}
}
And then:
def testWriteOk(outPath: String)(implicit spark: SparkSession, sc: SparkContext): Unit = {
println(s"try save: ${outPath}")
import org.apache.spark.sql.functions._
import org.jar.spark.util.JMultiWrite.ImplicitWrite // 导入工具
import spark.sqlContext.implicits._
batchTask[Int]("test.ok", 1 to 20, 6, t => {
val taskId = t.toString
val df1 =
Seq((1, "First Value", java.sql.Date.valueOf("2010-01-01")), (2, "Second Value", java.sql.Date.valueOf("2010-02-01")))
.toDF("int_column", "string_column", "date_column")
.withColumn("t0", lit(taskId))
df1.repartition(1).write
.partitionBy("t0")
.format("csv")
.multiWrite(outPath, taskId, overwrite = true) // 这里使用了 overwrite ,如果分区有重叠,请不要使用 overwrite
}, t => f"task.${t}%4d")
println(s"ok: count=${spark.read.csv(outPath).count()}") // 40
}
try {
testWriteOk(outPath + "/ok")
} catch {
case e: Throwable =>
}
Success:
$ ls ok/
t0=1 t0=10 t0=11 t0=12 t0=13 t0=14 t0=15 t0=16 t0=17 t0=18 t0=19 t0=2 t0=20 t0=3 t0=4 t0=5 t0=6 t0=7 t0=8 t0=9
The same applies to other output formats, pay attention to the use of overwrite
.
Test on spark 2.11.8.
Thanks for @Tal Joffe
Upvotes: -1
Reputation: 2017
So after much reading about how to tackle this problem I thought id transfer some wisdom back here to wrap things up. Thanks mostly to Tal's comments.
I've additionally found that writing directly to the s3://bucket/save/path seems dangerous because if a job is killed and the cleanup of the temporary folder doesnt happen at the end of the job, it seems like its left there for the next job and i've noticed sometimes the previous killed jobs temp's files land in the s3://bucket/save/path and causes duplication... Totally unreliable...
Additionally, the rename operation of the _temporary folder files to their appropriate s3 files, takes a horrendous amount of time (approx 1 sec per file) as S3 only supports copy/delete not rename. Additionally, only the driver instance renames these files using a single thread so as much as 1/5 of some jobs with large numbers of files/partitions are spent just waiting for rename operations.
I've ruled out using the DirectOutputCommitter for a number of reasons.
The only safe, performant, and consistent way to execute these jobs is to save them to unique temporary folder (unique by applicationId or timestamp) in hdfs first. And copy to S3 on job completion.
This allows concurrent jobs to execute as they will save to unique temp folders, no need to use the DirectOutputCommitter as the rename operation on HDFS is quicker than S3, and the saved data is more consistent.
Upvotes: 27
Reputation: 13154
I suspect this is because of the changes to partition discovery that were introduced in Spark 1.6. The changes means that Spark will only treat paths like .../xxx=yyy/
as partitions if you have specified a "basepath"-option (see Spark release notes here).
So I think your problem will be solved if you add the basepath-option, like this:
dataFrame
.write()
.partitionBy("eventDate", "category")
.option("basepath", "s3://bucket/save/path")
.mode(Append)
.parquet("s3://bucket/save/path");
(I haven't had the chance to verify it, but hopefully it will do the trick :))
Upvotes: 2