Reputation: 10780
Recently been observing this in our Laravel project that Pusher is getting clogged up with messages upto the limit. This is abnormal for usage of peak concurrent - 16 and 4 million messages a day.
Going through the Pusher dashboard, below is the extent of the damage being done.
There is a specific pattern of usage. When I turn off the supervisord worker, the graph stops and when I start the worker again, it comes back up in a predictable pattern shown in the graph below.
Below is a sample message I got when running redis-cli monitor
.
As you can see above, the index [attempts] is 69507. Does that mean that this event has been broadasted 69507 times ? Why would an event be broadcasted so many times ? When does an event stop being broadcasted ? Am I doing something wrong ?
Here is the code for the AgendaParticipantUpdated
event. Would be great if there is something fundamentally wrong in the way the event has been implemented.
<?php
namespace App\Events;
use App\AgendaParticipant;
use App\Agenda;
use Illuminate\Queue\SerializesModels;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
class AgendaParticipantUpdated implements ShouldBroadcast
{
use Dispatchable, InteractsWithSockets, SerializesModels;
/** @var agendaParticipant */
public $agendaParticipant;
/**
* Create a new event instance.
*
* @param AgendaParticipant $agendaParticipant
*/
public function __construct(AgendaParticipant $agendaParticipant)
{
$this->agendaParticipant = $agendaParticipant;
}
/**
* Get the channels the event should broadcast on.
*
* @return \Illuminate\Broadcasting\Channel|array
*/
public function broadcastOn()
{
return new PrivateChannel("meetings.{$this->agendaParticipant->agenda->meeting->id}");
}
/**
* Get the data to broadcast.
*
* @return array
*/
public function broadcastWith()
{
$agenda = Agenda::where('id', $this->agendaParticipant->agenda->id)->first();
$agenda->load(['participants' => function ($query) {
$query->orderBy('order')->orderBy('id');
}, 'participants.member.user']);
return [
'agendaParticipant' => $this->agendaParticipant,
'agenda' => $agenda,
];
}
}
There are lots of other broadcasts too like this one :
As you see above, this broadcast has attempt count of 873245.
Below is the code for the DiscussionCreated
event :
<?php
namespace App\Events;
use App\Discussion;
use App\Member;
use Illuminate\Broadcasting\Channel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow;
use Illuminate\Http\Response;
use Illuminate\Queue\SerializesModels;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
class DiscussionCreated implements ShouldBroadcast
{
use Dispatchable, InteractsWithSockets, SerializesModels;
/**
* @var Discussion
*/
public $discussion;
/**
* Create a new event instance.
*
* @param Discussion $discussion
*/
public function __construct(Discussion $discussion)
{
$this->discussion = $discussion;
}
public function broadcastOn()
{
$author = Member::where('id',$this->discussion->author_id)->firstOrFail();
return new PrivateChannel("group.{$author->group_id}");
}
public function broadcastWith()
{
if(\Auth::check())
{
$group = $this->discussion->author->group;
$group->load(['meetings', 'facilitator']);
$facilitatorId = optional($group->facilitator)->id;
$members = $group->members()->with('user')->get();
$discussion = $this->discussion;
$member = auth()->user()->activeGroupMember;
$list_read_discussions=\DB::table('members')
->where('member_id', '=', $member->id)
->join('acknowledgements', 'members.id', '=', 'acknowledgements.member_id')
->join('discussions', 'discussions.id', '=', 'acknowledgements.discussion_id')
->where('discussions.target_type','=','App\Group')
->whereNotExists( function($q) {
$q->select(\DB::raw(1))
->from('replies')
->where('acknowledgements.discussion_id','=','replies.discussion_id')
->whereColumn('replies.updated_at', '>','acknowledgements.updated_at');
})
->distinct('acknowledgements.discussion_id')
->pluck('acknowledgements.discussion_id');
$group_discussions_count= $discussion->count();
$group_read_discussions=$list_read_discussions;
$unreadGroupCount=max(0,$group_discussions_count - $group_read_discussions->count());
$authedMemberId = auth()->id();
//private menu section
$authedMember = auth()->user()->activeGroupMember;
foreach($members as $target){
$discussions_to_target_count=\App\Discussion::
where('target_type','=','App\Member')
->where('target_id','=',$target->id)
->Where('author_id','=',$authedMember->id)
->count();
$discussions_from_target_count=\App\Discussion::
where('target_type','=','App\Member')
->where('target_id','=',$authedMember->id)
->Where('author_id','=',$target->id)
->count();
$read_discussions_to_target_count=\DB::table('acknowledgements')
->where('acknowledgements.member_id', '=', $authedMember->id)
->join('discussions', 'discussions.id', '=', 'acknowledgements.discussion_id')
->where('discussions.target_type','=','App\Member')
->where('discussions.target_id', '=', $target->id)
->where('discussions.author_id', '=', $authedMember->id)
->whereNotExists( function($q) {
$q->select(\DB::raw(1))
->from('replies')
->where('acknowledgements.discussion_id','=','replies.discussion_id')
->whereColumn('replies.updated_at', '>','acknowledgements.updated_at');
})
->distinct('acknowledgements.discussion_id')
->pluck('acknowledgements.discussion_id')
->count();
$read_discussions_from_target_count=\DB::table('acknowledgements')
->where('acknowledgements.member_id', '=', $authedMember->id)
->join('discussions', 'discussions.id', '=', 'acknowledgements.discussion_id')
->where('discussions.target_type','=','App\Member')
->where('discussions.target_id', '=', $authedMember->id)
->where('discussions.author_id', '=', $target->id)
->whereNotExists( function($q) {
$q->select(\DB::raw(1))
->from('replies')
->where('acknowledgements.discussion_id','=','replies.discussion_id')
->whereColumn('replies.updated_at', '>','acknowledgements.updated_at');
})
->distinct('acknowledgements.discussion_id')
->pluck('acknowledgements.discussion_id')
->count();
$target->unreadPrivateCount=max(0,$discussions_to_target_count
+$discussions_from_target_count
-$read_discussions_from_target_count
-$read_discussions_to_target_count);
}
$this->discussion->replies = [];
$this->discussion->replies_count = 0;
$this->discussion->sort = $this->discussion->created_at->toDateTimeString();
$this->discussion->has_user_subscribed = 1;
return [
'discussion' => $this->discussion ? $this->discussion->toArray() : array(),
];
}
}
}
Would appreciate any help on this since it is interfering with our daily operations.
Upvotes: 0
Views: 938
Reputation: 664
I don't know if that can be you case but I encountered a similar problem using Laravel 5.6
Whet I found out was that the cron for the queued jobs with
* * * * * cd /path-to-your-project && php artisan schedule:run >> /dev/null 2>&1
I forgot to specify the user processing the cron, so the process was handled by the super user and subsequently locking up files / resources with the wrong permissions
so, I changed the cron command to
* * * * * apache:apache cd /path-to-your-project && php artisan schedule:run >> /dev/null 2>&1
which fixed my problem. Now I have my logs clear with no failed process attempts
Upvotes: 0
Reputation:
As you can see above, the index [attempts] is 69507. Does that mean that this event has been broadasted 69507 times?
Yes. It means that your queue worker has processed this job (broadcast) almost 70K times.
Why would an event be broadcasted so many times?
I believe the reason is that your broadcast job throws an Exception (it is failing). Laravel would keep trying to execute the failed jobs until they either: execute successfully without any exceptions or until max attempts/timeout values are reached. From the attached screenshots, I believe that you haven't configured such values. (maxTries
is empty in your screenshots)
When does an event stop being broadcasted ? Am I doing something wrong ?
I think you need to do two things:
tries
parameter for your queue workers, i.e. run php artisan queue:work --tries=3
(Check Laravel Documentation for more information).I hope this helps. Please let me know if you have any follow-up questions.
Update: Consider giving Laravel Horizon a try. It would give you a nice dashboard to monitor your jobs (including broadcasts), see the number of failing jobs and the causes (exceptions). It also has built-in notification settings that could help you figure out early when there is a problem.
Upvotes: 1