Jaywaa
Jaywaa

Reputation: 463

Hangfire - Prevent multiples of the same job being enqueued

Scenario:

Job 1 is scheduled to run every 5 minutes, and takes ~1 minute to complete.

A lot of work piles up and Job 1 takes 15 minutes to run.

There are now three Job 1's being processed concurrently - I don't want this.


How do I prevent Job 1 being added to the queue again if it is already there?

Is there Hangfire setting, or do I need to poll job statuses manually?

Upvotes: 32

Views: 41579

Answers (10)

3263927 contra
3263927 contra

Reputation: 29

public class PreventConcurrentExecutionJobFilter : JobFilterAttribute, IClientFilter, IServerFilter
{    
    public void OnCreating(CreatingContext filterContext)
    {
        JobList<ProcessingJobDto> jobs = JobStorage.Current.GetMonitoringApi().ProcessingJobs(0, 100);
        if (jobs.Any(x =>
                        x.Value.Job.Type == filterContext.Job.Type &&
                        x.Value.Job.Method.Name == filterContext.Job.Method.Name &&
                        x.Value.Job.Args.Count == filterContext.Job.Args.Count &&
                        string.Join(",",x.Value.Job.Args) == string.Join(",", filterContext.Job.Args)))
        {
            filterContext.Canceled = true;
        }
    }
}

this filter check controller, method, parameter count and equality

Upvotes: 1

R K
R K

Reputation: 91

I solved this problem. I hope it works for you

public class PreventConcurrentExecutionJobFilter : JobFilterAttribute, IClientFilter, IServerFilter
{
    public void OnCreating(CreatingContext filterContext)
    {
        var jobs = JobStorage.Current.GetMonitoringApi().ProcessingJobs(0, 100);
        if (jobs.Count(x => x.Value.Job.Type == filterContext.Job.Type && string.Join(".", x.Value.Job.Args) == string.Join(".", filterContext.Job.Args)) > 0)
        {
            filterContext.Canceled = true;
        }
    }

    public void OnPerformed(PerformedContext filterContext) { }

    void IClientFilter.OnCreated(CreatedContext filterContext) { }

    void IServerFilter.OnPerforming(PerformingContext filterContext) { }
}

Usings:

  1. Add to global filters
GlobalJobFilters.Filters.Add(new PreventConcurrentExecutionJobFilter());
  1. or by Abstract base job class
[PreventConcurrentExecutionJobFilter]
public abstract class IHangfireJob { 

}
  1. or by Single job
[PreventConcurrentExecutionJobFilter]
public class MyJob { 

}

Upvotes: 9

Dev Superman
Dev Superman

Reputation: 63

In my case, i have many methods running in parallel, but i can't run the same method more than once.

Using a solution from this topic, I just edited the query. If was already one method in the processing list with the same name of the actual method, the execution is cancelled.

I believe that resolve a lot of cases.

  1. Create this class:
    using Hangfire.Client;
    using Hangfire.Common;
    using Hangfire.Server;
    using Hangfire;
    
    public class PreventConcurrentExecutionJobFilter : JobFilterAttribute, IClientFilter, IServerFilter
    {
        public void OnCreating(CreatingContext filterContext)
        {
            var jobs = JobStorage.Current.GetMonitoringApi().ProcessingJobs(0, 100);
    
            var methodAlreadyProcessing = jobs.Any(x => x.Value.Job.Method.Name == filterContext.Job.Method.Name);
    
            if (methodAlreadyProcessing)
            {
                Console.WriteLine($"{DateTime.Now:HH:mm:ss} - Job {filterContext.Job.Method.Name} cancelled why was already exists in processing list!");
                filterContext.Canceled = true;
            }
        }
    
        public void OnPerformed(PerformedContext filterContext) { }
    
        void IClientFilter.OnCreated(CreatedContext filterContext) { }
    
        void IServerFilter.OnPerforming(PerformingContext filterContext) { }
    }

  1. Put the Annotation in your method:
    [PreventConcurrentExecutionJobFilter]
    public async Task MyTopTask()
    {
      ...
    }

Upvotes: 0

Chris H
Chris H

Reputation: 521

I was using the DisableConcurrentExecution attribute for my RecurringJob, but it was not working for me.

My mistake was that I was using it on my method and not on my interface.

[DisableConcurrentExecution(timeoutInSeconds: 10 * 60)]
Task SyncAllMyDataAsync();



RecurringJob.AddOrUpdate<IMySyncJobs>("Sync my data", x => x.SyncAllMyDataAsync(), "0 0 * * * *");

Upvotes: 4

Simon_Weaver
Simon_Weaver

Reputation: 145890

If you want to discard attempts to run something twice if it's already running you can always just do this (note no attributes applied):

    private static bool _isRunningUpdateOrders;
    public void UpdateOrders()
    {
        try
        {
            if (_isRunningUpdateOrders)
            {
                return; 
            }

            _isRunningUpdateOrders = true;

            // Logic...

        }
        finally 
        {
            _ isRunningUpdateOrders = false;
        }
   }

Edit: Please only use something like this as a quick fix, like if you've just discovered you have an issue and you're still evaluating better solutions :-) Or if you're lazy and you just want to 'sort of fix' the problem ;-)

Upvotes: -4

Yogi
Yogi

Reputation: 9739

You can use DisableConcurrentExecution attribute to prevent multiple executions of a method concurrently. Just put this attribute above your method -

[DisableConcurrentExecution(timeoutInSeconds: 10 * 60)]
public void Job1()
{
    // Metohd body
}

Upvotes: 23

akbar
akbar

Reputation: 773

Yes.It is possible as below:

            RecurringJob.AddOrUpdate(Environment.MachineName, () => MyJob(Environment.MachineName), Cron.HourInterval(2));

and MyJob should define like this:

    public void MyJob(string taskId)
    {
        if (!taskId.Equals(Environment.MachineName))
        {
            return;
        }
        //Do whatever you job should do.
    }

Upvotes: 1

tofi lagman
tofi lagman

Reputation: 111

a bit late but i was using this class to prevent duplicate jobs to run concurrently

public class SkipConcurrentExecutionAttribute : JobFilterAttribute, IServerFilter, IElectStateFilter
{
    private readonly int _timeoutSeconds;
    private const string DistributedLock = "DistributedLock";

    public SkipConcurrentExecutionAttribute(int timeOutSeconds)
    {
        if (timeOutSeconds < 0) throw new ArgumentException("Timeout argument value should be greater that zero.");
        this._timeoutSeconds = timeOutSeconds;
    }

    public void OnPerformed(PerformedContext filterContext)
    {
        if (!filterContext.Items.ContainsKey(DistributedLock))
            throw new InvalidOperationException("Can not release a distributed lock: it was not acquired.");

        var distributedLock = (IDisposable)filterContext.Items[DistributedLock];
        distributedLock?.Dispose();
    }



    public void OnPerforming(PerformingContext filterContext)
    {
        filterContext.WriteLine("Job Started");

        var resource = String.Format(
                           "{0}.{1}",
                          filterContext.BackgroundJob.Job.Type.FullName,
                          filterContext.BackgroundJob.Job.Method.Name);

        var timeOut = TimeSpan.FromSeconds(_timeoutSeconds);

        filterContext.WriteLine($"Waiting for running jobs to complete. (timeout: { _timeoutSeconds })");

        try
        {
            var distributedLock = filterContext.Connection.AcquireDistributedLock(resource, timeOut);
            filterContext.Items[DistributedLock] = distributedLock;
        }
        catch (Exception ex)
        {
            filterContext.WriteLine(ex);
            filterContext.WriteLine("Another job is already running, aborted.");
            filterContext.Canceled = true; 
        }

    }

    public void OnStateElection(ElectStateContext context)
    {
        //if (context.CandidateState as FailedState != null)
        //{

        //}
    }
}

Hope that helps, thx!

Upvotes: 10

Philipp Grathwohl
Philipp Grathwohl

Reputation: 2836

Sounds like this could be something that you might be interested in: https://discuss.hangfire.io/t/job-reentrancy-avoidance-proposal/607/8

The discussion is about skipping jobs that would be executed concurrently to a already running job.

Upvotes: 9

TheJoeIaut
TheJoeIaut

Reputation: 1532

There is an attribute called DisableConcurrentExecution, that prevents 2 Jobs of the same type running concurrently.

Though, in your case it could be best, to check if a task runs and skip accordingly.

Upvotes: 7

Related Questions