Pusher填满了Laravel Broadcasts

问题描述 投票:0回答:2

最近在我们的Laravel项目中观察到这一点,Pusher正被堵塞到极限的消息。对于峰值并发 - 每天使用16和400万条消息,这是不正常的。

通过Pusher仪表板,下面是损坏程度。

enter image description here

有一种特定的使用模式。当我关闭supervisord worker时,图表停止,当我再次启动worker时,它会以可预测的模式重新显示,如下图所示。

enter image description here

下面是我运行redis-cli monitor时收到的示例消息。

enter image description here

正如您在上面所看到的,索引[尝试]是69507.这是否意味着此事件已被广播69507次?为什么要多次播放一个事件?广播什么时候停止广播?难道我做错了什么 ?

这是AgendaParticipantUpdated事件的代码。如果事件的实施方式存在根本性的错误,那将会很棒。

<?php

namespace App\Events;

use App\AgendaParticipant;
use App\Agenda;
use Illuminate\Queue\SerializesModels;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;

class AgendaParticipantUpdated implements ShouldBroadcast
{
    use Dispatchable, InteractsWithSockets, SerializesModels;

    /** @var agendaParticipant */
    public $agendaParticipant;

    /**
     * Create a new event instance.
     *
     * @param AgendaParticipant $agendaParticipant
     */
    public function __construct(AgendaParticipant $agendaParticipant)
    {
        $this->agendaParticipant = $agendaParticipant;
    }

    /**
     * Get the channels the event should broadcast on.
     *
     * @return \Illuminate\Broadcasting\Channel|array
     */
    public function broadcastOn()
    {
        return new PrivateChannel("meetings.{$this->agendaParticipant->agenda->meeting->id}");
    }

    /**
     * Get the data to broadcast.
     *
     * @return array
     */
    public function broadcastWith()
    {
        $agenda = Agenda::where('id', $this->agendaParticipant->agenda->id)->first();
        $agenda->load(['participants' => function ($query) {
            $query->orderBy('order')->orderBy('id');
        }, 'participants.member.user']);

        return [
            'agendaParticipant' => $this->agendaParticipant,
            'agenda' => $agenda,
        ];
    }
}

还有很多其他广播也像这样:

enter image description here

如上所示,此广播的尝试次数为873245。

以下是DiscussionCreated事件的代码:

<?php

namespace App\Events;

use App\Discussion;
use App\Member;
use Illuminate\Broadcasting\Channel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow;
use Illuminate\Http\Response;
use Illuminate\Queue\SerializesModels;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;


class DiscussionCreated implements ShouldBroadcast
{
    use Dispatchable, InteractsWithSockets, SerializesModels;

    /**
     * @var Discussion
     */
    public $discussion;

    /**
     * Create a new event instance.
     *
     * @param Discussion $discussion
     */
    public function __construct(Discussion $discussion)
    {
        $this->discussion = $discussion;
    }

    public function broadcastOn()
    {
        $author = Member::where('id',$this->discussion->author_id)->firstOrFail();
        return new PrivateChannel("group.{$author->group_id}");
    }

    public function broadcastWith()
    {
        if(\Auth::check())
        {
            $group = $this->discussion->author->group;

            $group->load(['meetings', 'facilitator']);
            $facilitatorId = optional($group->facilitator)->id;
            $members = $group->members()->with('user')->get();
            $discussion = $this->discussion;
            $member = auth()->user()->activeGroupMember;

            $list_read_discussions=\DB::table('members')
                ->where('member_id', '=', $member->id)
                ->join('acknowledgements', 'members.id', '=', 'acknowledgements.member_id')
                ->join('discussions', 'discussions.id', '=', 'acknowledgements.discussion_id')
                ->where('discussions.target_type','=','App\Group')
                ->whereNotExists( function($q) {
                    $q->select(\DB::raw(1))
                        ->from('replies')
                        ->where('acknowledgements.discussion_id','=','replies.discussion_id')
                        ->whereColumn('replies.updated_at', '>','acknowledgements.updated_at');
                })
                ->distinct('acknowledgements.discussion_id')
                ->pluck('acknowledgements.discussion_id');

            $group_discussions_count= $discussion->count();
            $group_read_discussions=$list_read_discussions;
            $unreadGroupCount=max(0,$group_discussions_count - $group_read_discussions->count());

            $authedMemberId = auth()->id();

            //private menu section
            $authedMember = auth()->user()->activeGroupMember;
            foreach($members as $target){
                $discussions_to_target_count=\App\Discussion::
                where('target_type','=','App\Member')
                    ->where('target_id','=',$target->id)
                    ->Where('author_id','=',$authedMember->id)
                    ->count();

                $discussions_from_target_count=\App\Discussion::
                where('target_type','=','App\Member')
                    ->where('target_id','=',$authedMember->id)
                    ->Where('author_id','=',$target->id)
                    ->count();

                $read_discussions_to_target_count=\DB::table('acknowledgements')
                    ->where('acknowledgements.member_id', '=', $authedMember->id)
                    ->join('discussions', 'discussions.id', '=', 'acknowledgements.discussion_id')
                    ->where('discussions.target_type','=','App\Member')
                    ->where('discussions.target_id', '=', $target->id)
                    ->where('discussions.author_id', '=', $authedMember->id)
                    ->whereNotExists( function($q) {
                        $q->select(\DB::raw(1))
                            ->from('replies')
                            ->where('acknowledgements.discussion_id','=','replies.discussion_id')
                            ->whereColumn('replies.updated_at', '>','acknowledgements.updated_at');
                    })

                    ->distinct('acknowledgements.discussion_id')
                    ->pluck('acknowledgements.discussion_id')
                    ->count();

                $read_discussions_from_target_count=\DB::table('acknowledgements')
                    ->where('acknowledgements.member_id', '=', $authedMember->id)
                    ->join('discussions', 'discussions.id', '=', 'acknowledgements.discussion_id')
                    ->where('discussions.target_type','=','App\Member')
                    ->where('discussions.target_id', '=', $authedMember->id)
                    ->where('discussions.author_id', '=', $target->id)
                    ->whereNotExists( function($q) {
                        $q->select(\DB::raw(1))
                            ->from('replies')
                            ->where('acknowledgements.discussion_id','=','replies.discussion_id')
                            ->whereColumn('replies.updated_at', '>','acknowledgements.updated_at');
                    })

                    ->distinct('acknowledgements.discussion_id')
                    ->pluck('acknowledgements.discussion_id')
                    ->count();


                $target->unreadPrivateCount=max(0,$discussions_to_target_count
                    +$discussions_from_target_count
                    -$read_discussions_from_target_count
                    -$read_discussions_to_target_count);
            }

            $this->discussion->replies = [];
            $this->discussion->replies_count = 0;
            $this->discussion->sort = $this->discussion->created_at->toDateTimeString();
            $this->discussion->has_user_subscribed = 1;

            return [
                'discussion' => $this->discussion ? $this->discussion->toArray() : array(),
            ];

        }    
    }
}

感谢任何帮助,因为它干扰了我们的日常运作。

laravel laravel-5 websocket pusher
2个回答
1
投票

正如您在上面所看到的,索引[尝试]是69507.这是否意味着此事件已被广播69507次?

是。这意味着您的队列工作者已经处理了此作业(广播)近70K次。

为什么要多次播放一个事件?

我相信原因是你的广播工作抛出异常(它失败了)。 Laravel将继续尝试执行失败的作业,直到它们:成功执行而没有任何异常或直到达到最大尝试/超时值。从附带的屏幕截图中,我相信您尚未配置此类值。 (截图中maxTries为空)

广播什么时候停止广播?难道我做错了什么 ?

我想你需要做两件事:

  1. 定义队列工作程序的最大尝试次数。其中一种方法是为队列工作者指定tries参数,即运行php artisan queue:work --tries=3(检查Laravel Documentation以获取更多信息)。
  2. 检查错误日志文件,找到广播作业中抛出的异常并进行修复。

我希望这有帮助。如果您有任何后续问题,请与我们联系。

更新:考虑尝试Laravel Horizon。它会为您提供一个很好的仪表板来监控您的工作(包括广播),查看失败的工作数量和原因(例外)。它还具有内置的通知设置,可以帮助您在出现问题时及早发现。


0
投票

我不知道你是否可以这样,但是我使用Laravel 5.6遇到了类似的问题

我发现的是排队工作的cron

* * * * * cd /path-to-your-project && php artisan schedule:run >> /dev/null 2>&1

我忘了指定处理cron的用户,因此该进程由超级用户处理,随后使用错误的权限锁定文件/资源

所以,我把cron命令更改为

* * * * * apache:apache cd /path-to-your-project && php artisan schedule:run >> /dev/null 2>&1

这解决了我的问题。现在我清除了日志,没有失败的进程尝试

© www.soinside.com 2019 - 2024. All rights reserved.