grok
grok

Reputation: 210

Stopping @spawn-ed tasks in Julia

I'm in the following situation: an algorithm from a library uses some randomness, and usually is very fast but sometimes gets stuck (this is common for SAT solvers, e.g.).

I would like to do the following: start many instances of the task, and keep the result of the first one succeeding, killing the others. In Julia pseudocode:

futures = [@spawn myfunction(mydata)]
while true
    i = findnext(isready, futures)
    if i != nothing
        result = fetch(i)
        foreach(kill_task, futures)
        break
    end
    sleep(0.1)
end

but I can't find anything like "kill_task"; Distributed.del_client sounded like a good possibility, but doesn't seem to do that.

As a bonus, it would be nice to avoid the polling (sleeps etc.)

Note that it's not acceptable to modify "myfunction" to ask it to die itself, it has to be killed (since it's in an external library).

Upvotes: 2

Views: 756

Answers (1)

Przemyslaw Szufel
Przemyslaw Szufel

Reputation: 42194

Here is a complete code that spawns several worker processes, gets the result from the one that completed sooner and once the result is received all workers get terminated. I think this code is very useful for many scenarios is scientific computing:

using Distributed
addprocs(4)
@everywhere using Distributed 

@everywhere function getSomething()
   tt = rand(1:20)
   println("working for $tt seconds")
   for i in 1:tt
       sleep(1.0)
   end
   println("Finally, I am ready")
   return 100+myid()
end

function getFirst(workerf::Function, ws_list::Vector{Int}=workers())
    res = Vector{Union{Future,Nothing}}(nothing,length(ws_list))
    for i in 1:length(ws_list)
        @async begin
        res[i] = @spawnat ws_list[i] workerf()
        end        
    end
    my_res = nothing
    while true       
        for i in 1:length(ws_list)
            if res[i]!=nothing && isready(res[i]) 
                my_res = fetch(res[i])
                println(ws_list[i]," is ready!")
                break
            else
                println(ws_list[i]," NOT ready!")
            end 
        end
        if my_res != nothing
            @sync for i in 1:length(ws_list)
                @async interrupt(ws_list[i])
            end
            break
        end
        sleep(0.5)
    end
    println(my_res)
    return my_res
end

Now let us give it a spin:

julia> value = getFirst(getSomething);
2 NOT ready!
      From worker 4:    working for 16 seconds
      From worker 2:    working for 19 seconds
      From worker 5:    working for 2 seconds
3 NOT ready!
      From worker 3:    working for 15 seconds
4 NOT ready!
5 NOT ready!
2 NOT ready!
3 NOT ready!
4 NOT ready!
5 NOT ready!
2 NOT ready!
3 NOT ready!
4 NOT ready!
5 NOT ready!
2 NOT ready!
3 NOT ready!
4 NOT ready!
5 NOT ready!
      From worker 5:    Finally, I am ready
2 NOT ready!
3 NOT ready!
4 NOT ready!
5 is ready!
Worker 3 terminated.
Worker 2 terminated.
Worker 5 terminated.
Worker 4 terminated.

And let us see the result:

julia> println(value)
105

Upvotes: 1

Related Questions