我想将无限的字节流解析为无限的Haskell数据流。每个字节都从网络中读取,因此它们被包装到IO monad中。
更具体地说,我有一个无限的[IO(ByteString)]
类型的流。另一方面,我有一个纯粹的解析函数parse :: [ByteString] -> [Object]
(其中Object
是一个Haskell数据类型)
有没有办法将我的无限monad流插入我的解析函数?
例如,是否可以编写[IO(ByteString)] -> IO [ByteString]
类型的函数,以便我在monad中使用我的函数parse
?
一般而言,为了使IO操作正确排序并且行为可预测,每个操作都需要在下一个操作运行之前完全完成。在do-block中,这意味着这有效:
main = do
sequence (map putStrLn ["This","action","will","complete"])
putStrLn "before we get here"
但不幸的是,如果最终的IO行动很重要,那么这将无效:
dontRunMe = do
putStrLn "This is a problem when an action is"
sequence (repeat (putStrLn "infinite"))
putStrLn "<not printed>"
所以,即使sequence
可以专门用于正确的类型签名:
sequence :: [IO a] -> IO [a]
它无法在无限的IO操作列表中按预期工作。定义这样的序列没有问题:
badSeq :: IO [Char]
badSeq = sequence (repeat (return '+'))
但任何执行IO操作的尝试(例如,通过尝试打印结果列表的头部)都将挂起:
main = (head <$> badSeq) >>= print
如果您只需要结果的一部分,则无关紧要。在整个sequence
完成之前,你不会从IO monad中获得任何东西(如果列表是无限的,那么“永远不会”)。
如果你想从部分完成的IO动作中获取数据,你需要明确它并使用一个可怕的Haskell逃生舱,unsafeInterleaveIO
。此函数采用IO操作并“延迟”它,以便在需要该值之前它不会实际执行。
一般来说这是不安全的原因是现在有意义的IO动作可能意味着如果在稍后的时间点实际执行则会有所不同。举一个简单的例子,截断/删除文件的IO操作如果在写入更新文件内容之前执行,则会产生非常不同的效果!
无论如何,你想要做的是写一个懒惰版的sequence
:
import System.IO.Unsafe (unsafeInterleaveIO)
lazySequence :: [IO a] -> IO [a]
lazySequence [] = return [] -- oops, not infinite after all
lazySequence (m:ms) = do
x <- m
xs <- unsafeInterleaveIO (lazySequence ms)
return (x:xs)
这里的关键点是,当执行lazySequence infstream
动作时,它实际上只会执行第一个动作;其余的操作将被包含在延迟的IO操作中,该操作在请求返回列表的第二个和后续元素之前不会真正执行。
这适用于假IO操作:
> take 5 <$> lazySequence (repeat (return ('+'))
"+++++"
>
(如果你用lazySequence
替换sequence
,它会挂起)。它也适用于真正的IO操作:
> lns <- lazySequence (repeat getLine)
<waits for first line of input, then returns to prompt>
> print (head lns)
<prints whatever you entered>
> length (head (tail lns)) -- force next element
<waits for second line of input>
<then shows length of your second line before prompt>
>
无论如何,有了lazySequence
和类型的这个定义:
parse :: [ByteString] -> [Object]
input :: [IO ByteString]
你应该没有问题写作:
outputs :: IO [Object]
outputs = parse <$> lazySequence inputs
然后懒洋洋地使用它,但你想要:
main = do
objs <- outputs
mapM_ doSomethingWithObj objs
尽管上面的惰性IO机制非常简单直接,但由于资源管理问题,空间泄漏的脆弱性(代码的小变化会耗尽内存占用空间),懒惰的IO已经不再支持生产代码了。 ,以及异常处理的问题。
一个解决方案是conduit
库。另一个是pipes
。两者都是精心设计的流媒体库,可以支持无限流。
对于conduit
,如果你有一个解析函数,每个字节字符串创建一个对象,如:
parse1 :: ByteString -> Object
parse1 = ...
然后给出:
inputs :: [IO ByteString]
inputs = ...
useObject :: Object -> IO ()
useObject = ...
管道看起来像:
import Conduit
main :: IO ()
main = runConduit $ mapM_ yieldM inputs
.| mapC parse1
.| mapM_C useObject
鉴于您的解析函数具有签名:
parse :: [ByteString] -> [Object]
我很确定你不能直接将它与导管整合在一起(或者至少不能以任何不会消除使用导管的所有好处的方式)。你需要将它重写为管道友好的消耗字节字符串和生成对象。