无限的有效行动

问题描述 投票:4回答:1

我想将无限的字节流解析为无限的Haskell数据流。每个字节都从网络中读取,因此它们被包装到IO monad中。

更具体地说,我有一个无限的[IO(ByteString)]类型的流。另一方面,我有一个纯粹的解析函数parse :: [ByteString] -> [Object](其中Object是一个Haskell数据类型)

有没有办法将我的无限monad流插入我的解析函数?

例如,是否可以编写[IO(ByteString)] -> IO [ByteString]类型的函数,以便我在monad中使用我的函数parse

haskell stream effects infinite
1个回答
8
投票

The Problem

一般而言,为了使IO操作正确排序并且行为可预测,每个操作都需要在下一个操作运行之前完全完成。在do-block中,这意味着这有效:

main = do
    sequence (map putStrLn ["This","action","will","complete"])
    putStrLn "before we get here"

但不幸的是,如果最终的IO行动很重要,那么这将无效:

dontRunMe = do
    putStrLn "This is a problem when an action is"
    sequence (repeat (putStrLn "infinite"))
    putStrLn "<not printed>"

所以,即使sequence可以专门用于正确的类型签名:

sequence :: [IO a] -> IO [a]

它无法在无限的IO操作列表中按预期工作。定义这样的序列没有问题:

badSeq :: IO [Char]
badSeq = sequence (repeat (return '+'))

但任何执行IO操作的尝试(例如,通过尝试打印结果列表的头部)都将挂起:

main = (head <$> badSeq) >>= print

如果您只需要结果的一部分,则无关紧要。在整个sequence完成之前,你不会从IO monad中获得任何东西(如果列表是无限的,那么“永远不会”)。

The "Lazy IO" Solution

如果你想从部分完成的IO动作中获取数据,你需要明确它并使用一个可怕的Haskell逃生舱,unsafeInterleaveIO。此函数采用IO操作并“延迟”它,以便在需要该值之前它不会实际执行。

一般来说这是不安全的原因是现在有意义的IO动作可能意味着如果在稍后的时间点实际执行则会有所不同。举一个简单的例子,截断/删除文件的IO操作如果在写入更新文件内容之前执行,则会产生非常不同的效果!

无论如何,你想要做的是写一个懒惰版的sequence

import System.IO.Unsafe (unsafeInterleaveIO)

lazySequence :: [IO a] -> IO [a]
lazySequence [] = return []  -- oops, not infinite after all
lazySequence (m:ms) = do
  x <- m
  xs <- unsafeInterleaveIO (lazySequence ms)
  return (x:xs)

这里的关键点是,当执行lazySequence infstream动作时,它实际上只会执行第一个动作;其余的操作将被包含在延迟的IO操作中,该操作在请求返回列表的第二个和后续元素之前不会真正执行。

这适用于假IO操作:

> take 5 <$> lazySequence (repeat (return ('+'))
"+++++"
>

(如果你用lazySequence替换sequence,它会挂起)。它也适用于真正的IO操作:

> lns <- lazySequence (repeat getLine)
<waits for first line of input, then returns to prompt>
> print (head lns)
<prints whatever you entered>
> length (head (tail lns))  -- force next element
<waits for second line of input>
<then shows length of your second line before prompt>
>

无论如何,有了lazySequence和类型的这个定义:

parse :: [ByteString] -> [Object]
input :: [IO ByteString]

你应该没有问题写作:

outputs :: IO [Object]
outputs = parse <$> lazySequence inputs

然后懒洋洋地使用它,但你想要:

main = do
    objs <- outputs
    mapM_ doSomethingWithObj objs

Using Conduit

尽管上面的惰性IO机制非常简单直接,但由于资源管理问题,空间泄漏的脆弱性(代码的小变化会耗尽内存占用空间),懒惰的IO已经不再支持生产代码了。 ,以及异常处理的问题。

一个解决方案是conduit库。另一个是pipes。两者都是精心设计的流媒体库,可以支持无限流。

对于conduit,如果你有一个解析函数,每个字节字符串创建一个对象,如:

parse1 :: ByteString -> Object
parse1 = ...

然后给出:

inputs :: [IO ByteString]
inputs = ...

useObject :: Object -> IO ()
useObject = ...

管道看起来像:

import Conduit

main :: IO ()
main = runConduit $  mapM_ yieldM inputs
                  .| mapC parse1
                  .| mapM_C useObject

鉴于您的解析函数具有签名:

parse :: [ByteString] -> [Object]

我很确定你不能直接将它与导管整合在一起(或者至少不能以任何不会消除使用导管的所有好处的方式)。你需要将它重写为管道友好的消耗字节字符串和生成对象。

© www.soinside.com 2019 - 2024. All rights reserved.