tristanbailey
tristanbailey

Reputation: 4605

Check Laravel Jobs inside Queue::before to delete before they are processed

I was given the idea to look in the AppServiceProvider with Queue::before as a way to add a check for Jobs I no longer want to run and delete them without having to add checks to every Job I write.
Background, I am working on a SaaS that does audits so an audit can run for hours and be 1000s of jobs. If I can look for an audit id inside the jobs as they come through and compare with a Cache array of any audit ids that have been cancelled, I can save time.

So what I have got to is how do I unwrap the Job in the Queue::before to get an id to check? (Normal laravel Queues code, and using RabbitMQ)

As the jobs are wrapped in a layer or two of Event classes, and I can not dump the data to screen to see, just to log files, as it is in the queue.

in app/Providers/AppServiceProvider.php:

Queue::before(function (JobProcessing $event) {
    // $event->connectionName
    // $event->job
    $job = $event->job->payload();
    $obj = unserialize($job['data']['data']);
}

As far as it looks like for the events I am interesting the payload has data, which has data, that is the serialised object I am interested in. This does not seem the best way, or to see how to interact with it in a better way.

thanks

Upvotes: 4

Views: 3756

Answers (1)

Trip
Trip

Reputation: 2016

I am in the middle of a similar problem involving webhook delivery. Through a developer portal, we are allowing users to re-queue a webhook (to short-cut the wait on backed-off delivery attempts). Since this could create a second job for the same webhook, we sought a way to identify the original as out of date.

app/Jobs/DeliverWebhook.php constructor:

public function __construct(Webhook $webhook)
{
    $this->webhook      = $webhook;
    $this->queued_at    = Carbon::now();

    Cache::put(
        'DeliverWebhook.'. $this->webhook->id .'.QueuedAt',
        $this->queued_at,
        Carbon::now()->addDays(3)
    );
}

Here, you can see we've attached a queued_at attribute to this instance of the job. (We can probably also make this more unique with use of something like uniqid() or random_bytes() to avoid potential double-click issues or similar hiccups when queuing.)

The second part is that we set the semi-unique cache key to match this queued_at time. I set it to expire in 3 days, past the end of our backed-off retry attempts.

Now, when a job is picked up for processing, I can check the job instance's queued_at attribute against the cached value, and delete the job if it is old.

In my AppServiceProvider boot method:

Queue::before(function ($event) {
    if ($event->job->queue == 'webhooks' && $event->job->getName() == 'DeliverWebhook') {
        $cache_key = 'DeliverWebhook.'. $event->job->instance->webhook->id .'QueuedAt';

        if ($event->job->instance->queued_at < Cache::get($cache_key)) {
            $event->job->delete();

            throw new JobRequeuedException;
        }
    }
});

An exception is thrown at the end because the queue worker, by default, does not check if the job is deleted before calling $job->fire(). Throwing the exception forces the worker to skip fire() and jump into the handleJobException() method.

NOTE: I still need to test this appropriately.

Upvotes: 4

Related Questions