我有一个管理控制台,我希望每当创建一个用户时(在单独的页面上)都会发出警报。下面的代码有效,但是,存在竞争条件。如果两个用户的创建距离非常近,则只会触发一次。
class User < ApplicationRecord
after_commit :notify_creation, on: :create
def notify_creation
ActiveRecord::Base.connection_pool.with_connection do |connection|
self.class.execute_query(connection, ["NOTIFY user_created, '?'", id])
end
end
def self.listen_to_creation
ActiveRecord::Base.connection_pool.with_connection do |connection|
begin
execute_query(connection, ["LISTEN user_created"])
connection.raw_connection.wait_for_notify do |event, pid, id|
yield id
end
ensure
execute_query(connection, ["UNLISTEN user_created"])
end
end
end
def self.clean_sql(query)
sanitize_sql(query)
end
private
def self.execute_query(connection, query)
sql = self.clean_sql(query)
connection.execute(sql)
end
end
class AdminsController < ApplicationController
include ActionController::Live
def update
response.headers['Content-Type'] = 'text/event-stream'
sse = SSE.new(response.stream, event: 'notice')
begin
User.listen_to_creation do |user_id|
sse.write({user_id: user_id})
end
rescue ClientDisconnected
ensure
sse.close
end
end
end
这是我第一次这样做,所以我遵循了tutorial,就像大多数教程一样,它着重于对单个记录的更新,而不是监听整个表以进行新创建。
之所以这样,是因为您一次只发送一个更新,然后请求结束。如果您在AdminsController#update上发出请求。您有一个订阅者在等待您的通知。看看这个方块
begin
execute_query(connection, ["LISTEN user_created"])
connection.raw_connection.wait_for_notify do |event, pid, id|
yield id
end
ensure
execute_query(connection, ["UNLISTEN user_created"])
end
一旦收到一个通知,该阻止就会产生,然后关闭该通道。因此,如果您依靠前端在获得结果并请求结束后再进行一次连接尝试,则如果在此之前创建了一条记录,您将不会收到通知,因为当时没有侦听器连接到Postgres 。
这是任何实时通知系统中的常见问题。理想情况下,您希望管道始终处于打开状态(Websocket,SSE甚至是LongPolling)。如果您收到新项目,则使用该管道将其发送到前端,并且理想情况下,如Websockets和SSE一样,应使该管道保持打开状态。现在,您可以将SSE连接视为长轮询。
所以您的代码应该看起来像
# Snippet 2
def self.listen_to_creation
ActiveRecord::Base.connection_pool.with_connection do |connection|
begin
execute_query(connection, ["LISTEN user_created"])
loop do
connection.raw_connection.wait_for_notify do |event, pid, id|
yield id
end
end
ensure
execute_query(connection, ["UNLISTEN user_created"])
end
end
end
但是这会导致一个问题,即使关闭连接直到线程中的某些数据进入线程并且在回写时遇到错误,它仍将使线程永远存活。您可以选择以较短的通知间隔运行固定的次数,也可以为其添加某种听音。有两种简单的方法可以实现听音。我将它们添加为快速修改代码。
# Snippet 3
def self.listen_to_creation(heartbeat_interval = 10)
ActiveRecord::Base.connection_pool.with_connection do |connection|
begin
execute_query(connection, ["LISTEN user_created"])
last_hearbeat = Time.now
loop do
connection.raw_connection.wait_for_notify(heartbeat_interval) do |event, pid, id|
yield({id: id})
end
if Time.now - last_heartbeat >= heartbeat_interval
yield({heartbeat: true})
last_heartbeat = Time.now
end
end
ensure
execute_query(connection, ["UNLISTEN user_created"])
end
end
end
在以上示例中,您至少每隔heartbeat_interval秒就会在管道中发送一些内容。因此,如果管道关闭,它会出错并关闭管道,从而释放线程。
这种方法在模型中添加了与控制器相关的逻辑,如果您希望在没有时间间隔的情况下保持postgres通知,那么可以做的另一件事就是在控制器本身中启动一个线程。在控制器方法中启动一个线程,该线程为heartbeat_interval休眠,并在唤醒后写入sse.write({heartbeat: true})
。在这种情况下,您可以使模型代码与代码段2相同。
此外,我在an answer中添加了其他要在Puma&Rails上与SSE一起观看的内容: