Rails和Postgresql,每当在不存在竞争条件的情况下在管理仪表板上创建新记录时通知/监听

问题描述 投票:0回答:1

我有一个管理控制台,我希望每当创建一个用户时(在单独的页面上)都会发出警报。下面的代码有效,但是,存在竞争条件。如果两个用户的创建距离非常近,则只会触发一次。

class User < ApplicationRecord
  after_commit :notify_creation, on: :create

  def notify_creation
    ActiveRecord::Base.connection_pool.with_connection do |connection|
      self.class.execute_query(connection, ["NOTIFY user_created, '?'", id])
    end
  end

  def self.listen_to_creation
    ActiveRecord::Base.connection_pool.with_connection do |connection|
      begin
        execute_query(connection, ["LISTEN user_created"])
        connection.raw_connection.wait_for_notify do |event, pid, id|
          yield id
        end
      ensure
        execute_query(connection, ["UNLISTEN user_created"])
      end
    end
  end

  def self.clean_sql(query)
    sanitize_sql(query)
  end

  private

  def self.execute_query(connection, query)
    sql = self.clean_sql(query)
    connection.execute(sql)
  end
end

class AdminsController < ApplicationController
  include ActionController::Live

  def update
    response.headers['Content-Type'] = 'text/event-stream'
    sse = SSE.new(response.stream, event: 'notice')
    begin
      User.listen_to_creation do |user_id|
        sse.write({user_id: user_id})
      end
    rescue ClientDisconnected
    ensure
      sse.close
    end
  end
end

这是我第一次这样做,所以我遵循了tutorial,就像大多数教程一样,它着重于对单个记录的更新,而不是监听整个表以进行新创建。

ruby-on-rails postgresql publish-subscribe server-sent-events
1个回答
0
投票

之所以这样,是因为您一次只发送一个更新,然后请求结束。如果您在AdminsController#update上发出请求。您有一个订阅者在等待您的通知。看看这个方块

begin
  execute_query(connection, ["LISTEN user_created"])
  connection.raw_connection.wait_for_notify do |event, pid, id|
    yield id
  end
ensure
  execute_query(connection, ["UNLISTEN user_created"])
end

一旦收到一个通知,该阻止就会产生,然后关闭该通道。因此,如果您依靠前端在获得结果并请求结束后再进行一次连接尝试,则如果在此之前创建了一条记录,您将不会收到通知,因为当时没有侦听器连接到Postgres 。

这是任何实时通知系统中的常见问题。理想情况下,您希望管道始终处于打开状态(Websocket,SSE甚至是LongPolling)。如果您收到新项目,则使用该管道将其发送到前端,并且理想情况下,如Websockets和SSE一样,应使该管道保持打开状态。现在,您可以将SSE连接视为长轮询。

所以您的代码应该看起来像

# Snippet 2
  def self.listen_to_creation
    ActiveRecord::Base.connection_pool.with_connection do |connection|
      begin
        execute_query(connection, ["LISTEN user_created"])
        loop do 
          connection.raw_connection.wait_for_notify do |event, pid, id|
            yield id
          end
        end
      ensure
        execute_query(connection, ["UNLISTEN user_created"])
      end
    end
  end

但是这会导致一个问题,即使关闭连接直到线程中的某些数据进入线程并且在回写时遇到错误,它仍将使线程永远存活。您可以选择以较短的通知间隔运行固定的次数,也可以为其添加某种听音。有两种简单的方法可以实现听音。我将它们添加为快速修改代码。

# Snippet 3
def self.listen_to_creation(heartbeat_interval = 10)
    ActiveRecord::Base.connection_pool.with_connection do |connection|
      begin
        execute_query(connection, ["LISTEN user_created"])
        last_hearbeat = Time.now
        loop do 
          connection.raw_connection.wait_for_notify(heartbeat_interval) do |event, pid, id|
            yield({id: id})
          end
          if Time.now - last_heartbeat >= heartbeat_interval
            yield({heartbeat: true})
            last_heartbeat = Time.now
          end
        end
      ensure
        execute_query(connection, ["UNLISTEN user_created"])
      end
    end
  end

在以上示例中,您至少每隔heartbeat_interval秒就会在管道中发送一些内容。因此,如果管道关闭,它会出错并关闭管道,从而释放线程。

这种方法在模型中添加了与控制器相关的逻辑,如果您希望在没有时间间隔的情况下保持postgres通知,那么可以做的另一件事就是在控制器本身中启动一个线程。在控制器方法中启动一个线程,该线程为heartbeat_interval休眠,并在唤醒后写入sse.write({heartbeat: true})。在这种情况下,您可以使模型代码与代码段2相同。

此外,我在an answer中添加了其他要在Puma&Rails上与SSE一起观看的内容:

© www.soinside.com 2019 - 2024. All rights reserved.