Dici
Dici

Reputation: 25960

Can a speculative task continue running shortly after a Spark job returned?

I'm having issues with a simple Spark job of mine, which looks like this after simplification.

JavaRDD<ObjectNode> rdd = pullAndProcessData();
ManifestFilesystem fs = getOutputFS();
List<WriteObjectResult> writeObjectResults = rdd.mapPartitions(fs::write).collect();
fs.writeManifest(Manifest.makeManifest(writeObjectResults));

My expectation with this code is that whatever happens, writeManifest is going to be called if and only if all the tasks are finished and have successfully written their partition to S3. The problem is that apparently, some tasks are writing to S3 after the manifest, which should never happen.

In ManifestFilesystem.write, I delete the existing manifest (if there is one) to invalidate it because the normal workflow should be:

I'm suspecting it could happen because of speculated tasks, in the following scenario:

Is that something that can happen ? Does anybody have another hypothesis for such behaviour ?

Note: using built-in data publishing methods is not an option

Note 2: I actually found this which tends to confirm my intuition, but it would still be great to have a confirmation because I'm not using standard HDFS or S3 read/write methods for reasons outside of the scope of this question.

Upvotes: 0

Views: 657

Answers (2)

Dici
Dici

Reputation: 25960

I'll answer my own question after realizing that there was no way around it from the perspective of Spark: how would you make sure you kill all the speculative tasks before they have the time to complete ? It's actually better to let them run entirely, otherwise they might be killed while writing to a file, which would then be truncated.

There are different possible approaches :

  • a few messages in this thread suggest that one common practice is to write to a temporary attempt file before performing an atomic rename (cheap on most filesystems because it's a mere pointer switch). If a speculative task tries to rename its temporary file to an existing name, which won't happen concurrently if the operation is atomic, then the rename request is ignored and the temp file deleted.

  • to my knowledge, S3 does not provide atomic rename. Plus, although the process described above is fairly easy to implement, we are currently trying to limit homebrew solutions to the maximum and keep the system simple. Therefore, my final solution will be to use a jobId (for example, the timestamp at which the job started) and pass it around to the slaves as well as write it in the manifest. When writing a file to the FS, the following logic will be applied:

    public WriteObjectResult write(File localTempFile, long jobId) {
        // cheap operation to check if the manifest is already there
        if (manifestsExists()) {
             long manifestJobId = Integer.parseInt(getManifestMetadata().get("jobId"));
             if (manifestJobId == jobId) {
                 log.warn("Job " + jobId + " has already completed successfully and published a manifest. Ignoring write request."
                 return null;
             }
             log.info("A manifest has already been published by job " + jobId + " for this dataset. Invalidating manifest.");
             deleteExistingManifest();
        }    
        return publish(localTempFile);
    }
    

Upvotes: 0

Kien Truong
Kien Truong

Reputation: 11381

Spark does not proactively kill speculative tasks. It just waits until the task is finished and ignore the result. I think it's entirely possible that your speculative tasks continue writing after the collect call.

Upvotes: 1

Related Questions