发送 POSIX 信号(SIGTERM 等)时在多个 Haskell 子线程中运行清理函数

问题描述 投票:0回答:1

TL;DR - 我如何在 Haskell 中进行以下工作:

  • SIGTERM
    发送到具有许多活动线程(每个线程都在执行一项作业)的长时间运行的程序
  • 让所有子线程在退出之前运行清理函数(更新数据库以表明作业已中止)

在我(非常缺乏经验)看来,实现这种情况的最干净的方法似乎是在“主”线程中捕获

SIGTERM
,在子线程中引发异步异常,然后在子线程中使用
bracket
线程通过运行一些清理代码来对异步异常做出反应。根据经验,我无法完成这项工作。


更多颜色:

我有一个 Haskell 程序,它生成许多线程来完成工作(使用

async
)。基本上,它:

  • 等待来自数据库作业队列的新作业通知
  • 生成一个新线程来完成工作
    • 线程会随着进度更新数据库中的作业状态(例如
      running
      paused
    • 如果作业完成、用户取消作业或发生同步异常,它会使用最终状态更新数据库(例如
      completed
      cancelled
      aborted

至关重要的是,主线程永远运行,只是监听新作业,除非被

SIGINT
SIGTERM
SIGKILL
等中断。

当程序获得

SIGINT
SIGTERM
时,我想在“子”线程终止之前运行一些清理(即更新数据库以将正在进行的作业的状态设置为
aborted
)。但是,我绝对不知道该怎么做。

我的理解是,处理从“父”线程抛出到“子”线程的异常是使用

bracket
,它屏蔽了工作主体的异步异常,允许您在之前运行清理函数终止。

但是,

bracket
似乎与信号处理交互不佳。我尝试安装信号处理程序,尝试将
SIGTERM
转换为我可以正确处理的运行时异常。这对于我安装处理程序的线程非常有用,但我无法向其他线程抛出异步异常,我认为因为它们也收到了SIGTERM
,所以它们立即死亡。

似乎我无法为每个线程安装单独的

SIGTERM

,因为看起来运行时在所有线程中只能为每个中断类型拥有一个信号处理程序(基本上,如果我这样做,最后一个启动的线程获得中断,但所有其他线程,包括主线程,继续运行)。

exception haskell concurrency ghc
1个回答
0
投票
这是一个执行此操作的小示例。

import Control.Concurrent import Control.Exception import Data.Foldable (for_) import Data.Traversable (for) import System.Posix.Signals (installHandler, sigTERM, Handler(..)) data Result = Done | Aborted deriving Show thread :: MVar Result -> IO () thread v = job `catch` handler where job = do threadDelay 5000000 putMVar v Done handler e = case e of ThreadKilled -> putMVar v Aborted _ -> throwIO e main :: IO () main = do -- install handler for SIGTERM: throw UserInterrupt to main thread -- (SIGINT is already installed by default) mainId <- myThreadId installHandler sigTERM (Catch (throwTo mainId UserInterrupt)) Nothing -- spawn threads, returning their IDs (for throwTo) and a synchronization MVar children <- for [0..9] $ \_ -> do v <- newEmptyMVar t <- forkIO (thread v) pure (t, v) -- wait for threads to terminate let wait = do for_ children $ \ (_, v) -> do _ <- readMVar v pure () putStrLn "Normal termination" wait `catch` \e -> case e of UserInterrupt -> do putStrLn "Killed." putStrLn "Cleaning up..." -- kill children threads for_ children $ \ (t, _) -> throwTo t ThreadKilled putStrLn "Waiting on children" statuses <- for children $ \ (t, v) -> do r <- readMVar v pure (t, r) putStrLn ("Job statuses: " ++ show statuses) e -> throwIO e
    
© www.soinside.com 2019 - 2024. All rights reserved.