Reputation: 4605
I was given the idea to look in the AppServiceProvider with Queue::before as a way to add a check for Jobs I no longer want to run and delete them without having to add checks to every Job I write.
Background, I am working on a SaaS that does audits so an audit can run for hours and be 1000s of jobs. If I can look for an audit id inside the jobs as they come through and compare with a Cache array of any audit ids that have been cancelled, I can save time.
So what I have got to is how do I unwrap the Job in the Queue::before to get an id to check? (Normal laravel Queues code, and using RabbitMQ)
As the jobs are wrapped in a layer or two of Event classes, and I can not dump the data to screen to see, just to log files, as it is in the queue.
in app/Providers/AppServiceProvider.php:
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
$job = $event->job->payload();
$obj = unserialize($job['data']['data']);
}
As far as it looks like for the events I am interesting the payload has data, which has data, that is the serialised object I am interested in. This does not seem the best way, or to see how to interact with it in a better way.
thanks
Upvotes: 4
Views: 3756
Reputation: 2016
I am in the middle of a similar problem involving webhook delivery. Through a developer portal, we are allowing users to re-queue a webhook (to short-cut the wait on backed-off delivery attempts). Since this could create a second job for the same webhook, we sought a way to identify the original as out of date.
app/Jobs/DeliverWebhook.php
constructor:
public function __construct(Webhook $webhook)
{
$this->webhook = $webhook;
$this->queued_at = Carbon::now();
Cache::put(
'DeliverWebhook.'. $this->webhook->id .'.QueuedAt',
$this->queued_at,
Carbon::now()->addDays(3)
);
}
Here, you can see we've attached a queued_at
attribute to this instance of the job. (We can probably also make this more unique with use of something like uniqid()
or random_bytes()
to avoid potential double-click issues or similar hiccups when queuing.)
The second part is that we set the semi-unique cache key to match this queued_at
time. I set it to expire in 3 days, past the end of our backed-off retry attempts.
Now, when a job is picked up for processing, I can check the job instance's queued_at
attribute against the cached value, and delete the job if it is old.
In my AppServiceProvider
boot
method:
Queue::before(function ($event) {
if ($event->job->queue == 'webhooks' && $event->job->getName() == 'DeliverWebhook') {
$cache_key = 'DeliverWebhook.'. $event->job->instance->webhook->id .'QueuedAt';
if ($event->job->instance->queued_at < Cache::get($cache_key)) {
$event->job->delete();
throw new JobRequeuedException;
}
}
});
An exception is thrown at the end because the queue worker, by default, does not check if the job is deleted before calling $job->fire()
. Throwing the exception forces the worker to skip fire()
and jump into the handleJobException()
method.
NOTE: I still need to test this appropriately.
Upvotes: 4