最近在我们的Laravel项目中观察到这一点,Pusher正被堵塞到极限的消息。对于峰值并发 - 每天使用16和400万条消息,这是不正常的。
通过Pusher仪表板,下面是损坏程度。
有一种特定的使用模式。当我关闭supervisord worker时,图表停止,当我再次启动worker时,它会以可预测的模式重新显示,如下图所示。
下面是我运行redis-cli monitor
时收到的示例消息。
正如您在上面所看到的,索引[尝试]是69507.这是否意味着此事件已被广播69507次?为什么要多次播放一个事件?广播什么时候停止广播?难道我做错了什么 ?
这是AgendaParticipantUpdated
事件的代码。如果事件的实施方式存在根本性的错误,那将会很棒。
<?php
namespace App\Events;
use App\AgendaParticipant;
use App\Agenda;
use Illuminate\Queue\SerializesModels;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
class AgendaParticipantUpdated implements ShouldBroadcast
{
use Dispatchable, InteractsWithSockets, SerializesModels;
/** @var agendaParticipant */
public $agendaParticipant;
/**
* Create a new event instance.
*
* @param AgendaParticipant $agendaParticipant
*/
public function __construct(AgendaParticipant $agendaParticipant)
{
$this->agendaParticipant = $agendaParticipant;
}
/**
* Get the channels the event should broadcast on.
*
* @return \Illuminate\Broadcasting\Channel|array
*/
public function broadcastOn()
{
return new PrivateChannel("meetings.{$this->agendaParticipant->agenda->meeting->id}");
}
/**
* Get the data to broadcast.
*
* @return array
*/
public function broadcastWith()
{
$agenda = Agenda::where('id', $this->agendaParticipant->agenda->id)->first();
$agenda->load(['participants' => function ($query) {
$query->orderBy('order')->orderBy('id');
}, 'participants.member.user']);
return [
'agendaParticipant' => $this->agendaParticipant,
'agenda' => $agenda,
];
}
}
还有很多其他广播也像这样:
如上所示,此广播的尝试次数为873245。
以下是DiscussionCreated
事件的代码:
<?php
namespace App\Events;
use App\Discussion;
use App\Member;
use Illuminate\Broadcasting\Channel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow;
use Illuminate\Http\Response;
use Illuminate\Queue\SerializesModels;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
class DiscussionCreated implements ShouldBroadcast
{
use Dispatchable, InteractsWithSockets, SerializesModels;
/**
* @var Discussion
*/
public $discussion;
/**
* Create a new event instance.
*
* @param Discussion $discussion
*/
public function __construct(Discussion $discussion)
{
$this->discussion = $discussion;
}
public function broadcastOn()
{
$author = Member::where('id',$this->discussion->author_id)->firstOrFail();
return new PrivateChannel("group.{$author->group_id}");
}
public function broadcastWith()
{
if(\Auth::check())
{
$group = $this->discussion->author->group;
$group->load(['meetings', 'facilitator']);
$facilitatorId = optional($group->facilitator)->id;
$members = $group->members()->with('user')->get();
$discussion = $this->discussion;
$member = auth()->user()->activeGroupMember;
$list_read_discussions=\DB::table('members')
->where('member_id', '=', $member->id)
->join('acknowledgements', 'members.id', '=', 'acknowledgements.member_id')
->join('discussions', 'discussions.id', '=', 'acknowledgements.discussion_id')
->where('discussions.target_type','=','App\Group')
->whereNotExists( function($q) {
$q->select(\DB::raw(1))
->from('replies')
->where('acknowledgements.discussion_id','=','replies.discussion_id')
->whereColumn('replies.updated_at', '>','acknowledgements.updated_at');
})
->distinct('acknowledgements.discussion_id')
->pluck('acknowledgements.discussion_id');
$group_discussions_count= $discussion->count();
$group_read_discussions=$list_read_discussions;
$unreadGroupCount=max(0,$group_discussions_count - $group_read_discussions->count());
$authedMemberId = auth()->id();
//private menu section
$authedMember = auth()->user()->activeGroupMember;
foreach($members as $target){
$discussions_to_target_count=\App\Discussion::
where('target_type','=','App\Member')
->where('target_id','=',$target->id)
->Where('author_id','=',$authedMember->id)
->count();
$discussions_from_target_count=\App\Discussion::
where('target_type','=','App\Member')
->where('target_id','=',$authedMember->id)
->Where('author_id','=',$target->id)
->count();
$read_discussions_to_target_count=\DB::table('acknowledgements')
->where('acknowledgements.member_id', '=', $authedMember->id)
->join('discussions', 'discussions.id', '=', 'acknowledgements.discussion_id')
->where('discussions.target_type','=','App\Member')
->where('discussions.target_id', '=', $target->id)
->where('discussions.author_id', '=', $authedMember->id)
->whereNotExists( function($q) {
$q->select(\DB::raw(1))
->from('replies')
->where('acknowledgements.discussion_id','=','replies.discussion_id')
->whereColumn('replies.updated_at', '>','acknowledgements.updated_at');
})
->distinct('acknowledgements.discussion_id')
->pluck('acknowledgements.discussion_id')
->count();
$read_discussions_from_target_count=\DB::table('acknowledgements')
->where('acknowledgements.member_id', '=', $authedMember->id)
->join('discussions', 'discussions.id', '=', 'acknowledgements.discussion_id')
->where('discussions.target_type','=','App\Member')
->where('discussions.target_id', '=', $authedMember->id)
->where('discussions.author_id', '=', $target->id)
->whereNotExists( function($q) {
$q->select(\DB::raw(1))
->from('replies')
->where('acknowledgements.discussion_id','=','replies.discussion_id')
->whereColumn('replies.updated_at', '>','acknowledgements.updated_at');
})
->distinct('acknowledgements.discussion_id')
->pluck('acknowledgements.discussion_id')
->count();
$target->unreadPrivateCount=max(0,$discussions_to_target_count
+$discussions_from_target_count
-$read_discussions_from_target_count
-$read_discussions_to_target_count);
}
$this->discussion->replies = [];
$this->discussion->replies_count = 0;
$this->discussion->sort = $this->discussion->created_at->toDateTimeString();
$this->discussion->has_user_subscribed = 1;
return [
'discussion' => $this->discussion ? $this->discussion->toArray() : array(),
];
}
}
}
感谢任何帮助,因为它干扰了我们的日常运作。
正如您在上面所看到的,索引[尝试]是69507.这是否意味着此事件已被广播69507次?
是。这意味着您的队列工作者已经处理了此作业(广播)近70K次。
为什么要多次播放一个事件?
我相信原因是你的广播工作抛出异常(它失败了)。 Laravel将继续尝试执行失败的作业,直到它们:成功执行而没有任何异常或直到达到最大尝试/超时值。从附带的屏幕截图中,我相信您尚未配置此类值。 (截图中maxTries
为空)
广播什么时候停止广播?难道我做错了什么 ?
我想你需要做两件事:
tries
参数,即运行php artisan queue:work --tries=3
(检查Laravel Documentation以获取更多信息)。我希望这有帮助。如果您有任何后续问题,请与我们联系。
更新:考虑尝试Laravel Horizon。它会为您提供一个很好的仪表板来监控您的工作(包括广播),查看失败的工作数量和原因(例外)。它还具有内置的通知设置,可以帮助您在出现问题时及早发现。
我不知道你是否可以这样,但是我使用Laravel 5.6遇到了类似的问题
我发现的是排队工作的cron
* * * * * cd /path-to-your-project && php artisan schedule:run >> /dev/null 2>&1
我忘了指定处理cron的用户,因此该进程由超级用户处理,随后使用错误的权限锁定文件/资源
所以,我把cron命令更改为
* * * * * apache:apache cd /path-to-your-project && php artisan schedule:run >> /dev/null 2>&1
这解决了我的问题。现在我清除了日志,没有失败的进程尝试