我已经实现了一个DynamicSupervisor来处理使用Process.send_after/3
以一定时间精度发送推送通知的工作者。
在某些情况下,比如当我发布没有hotswap功能的新版本时,我想保留所有仍未在DETS中完成的工作人员的记录,以及当DynamicSupervisor再次以与DETS中保存的状态相同的状态开始创建所有这些工作人员时。
我知道DynamicSupervisor从一开始就不支持worker,我应该实现start_child
来启动一个新的worker,但是我不知道如何将DynamicSupervisor的启动与同一模块上的这些worker的启动集成在一起。
我尝试了不同的方法,但没有成功(Task.async
,Process.send_after
等)。
我应该在代码的其他部分实现启动行为(我使用Phoenix)?
编辑:我发现了一个小问题(期待%{x:1,y:2}并且正在接收[x:1,y:2]
这是DynamicSupervisor上的init函数。
...
def init(arg) do
Task.async(fn ->
start_lost_children()
end)
DynamicSupervisor.init(arg)
end
def start_child(data) do
spec = {MyApp.Worker, %{data: data}}
DynamicSupervisor.start_child(__MODULE__, spec)
end
...
这是我得到的错误消息,尽管不影响DynamicSupervisor的行为。
[error] DynamicSupervisor received unexpected message: {#Reference<0.2565843855.2975858690.212433>, :ok}
[error] DynamicSupervisor received unexpected message: {:DOWN, #Reference<0.2565843855.2975858690.212433>, :process, #PID<0.593.0>, :normal}
:ok
是start_lost_children/0
回归的结果。
如果我实现handle_info/2
,它没有捕获任何东西。
我把它放在这里主要是为了代码格式化;在我的项目中,在相同条件下,下面的代码适用于我。我不确定它会有所帮助,因为你已经明确提到你已尝试过这种方法,但是由于OP中没有代码我可以比较,我们走了。
启动任务的过程应该实现这两个handle_info/2
回调:
@doc false
# Task finished {#Reference<0.0.1.6335>, :ok}
def handle_info({_pid, _payload}, state),
do: {:noreply, state}
@doc false
def handle_info({:DOWN, _ref, :process, _pid, :normal}, state),
do: {:noreply, state}
除此之外,您可能希望为:normal
之外的其他返回状态显式处理后者。
init
不适合放置Task.async/1
,因为init/1
是一个从进程调用的回调,它启动你的自定义DynamicSupervisor
。此过程将收回info
消息,(请参阅您的错误消息报告DynamicSupervisor
未实现处理程序。)
以下应该有效(假设两个handle_info/2
都已实施):
def start_link(arg) do
with link <- DynamicSupervisor.start_link(...) do
Task.async(...)
link
end
end
@impl true
def init(args) do
DynamicSupervisor.init(args)
end
Task.async/1
开始了一项必须等待的任务。开发人员最终必须调用Task.await/2
或Task.yield/2
。
尝试使用Task.start/1
- 仅在任务用于副作用时使用(即对返回的结果不感兴趣),并且不应将其链接到当前进程。