OutputStream
是OutputStream
的抽象-一种认为是延续的方法。特别是,与对应的looks like a contravariant functor不同,它不是容器-它不能为您提供任何实际值。
[InputStream
]我希望反过来—合并It has been shown previously how to split an InputStream
.。也就是说,我需要这样的功能:
InputStream
想到它的一种方法是从OutputStream
开始。
contrafork :: OutputStream a → OutputStream b → IO (OutputStream (a, b))
contrafork = …
instance Applicative OutputStream
-这是发生动作的地方。我必须使其异步,以避免«无限期阻塞线程]的情况,然后我必须在外部传递一个令牌,消费者应等待以确保两个流完全执行。在错误的位置等待仍会导致线程锁定。总而言之,这种解决方案还有待改进。
这是我用来查看我的contrafork :: OutputStream a → OutputStream b → IO (OutputStream (a, b), Async ( ))
contrafork ω₁ ω₂ = do
buffer ← newEmptyMVar
ω ← ω₁ & contramapM (\ (x, y) → (putMVar buffer . Just $ y) >> return x) >>= atEndOfOutput (putMVar buffer Nothing)
α ← makeInputStream (takeMVar buffer)
token ← async $ connect α ω₂
return (ω, token)
是否有效的跑步者:
connect
is a loop under the hood
如果使其可执行并在控制台中运行,则应该看到一些交错的红线和蓝线。
[connect
被定义为contrafork
,因此可以将两个流合并如下:
#!/usr/bin/env stack
{- stack --resolver=lts-14 script
--package io-streams
--package bytestring
--package ansi-terminal
--package async
--ghc-options -Wall
-}
{-# language UnicodeSyntax #-}
{-# language OverloadedStrings #-}
{-# language BlockArguments #-}
import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception
import Data.Function
import System.Console.ANSI
import System.IO.Streams hiding (map)
import qualified System.IO.Streams as Streams
import qualified System.IO.Streams.ByteString as ByteString
main :: IO ( )
main = do
α ← "1\n2\n3\n" & fromByteString >>= ByteString.lines
consoleWriteLock ← newMVar ( )
[ω₁, ω₂] ← traverse (makeOutputStream . logMaybeLineWithColour consoleWriteLock) [Red, Blue]
(ω², token) ← contrafork ω₁ ω₂
α² ← Streams.map (\ x → (x, x)) α
connect α² ω²
wait token
where
logMaybeLineWithColour lock colour = maybe (return ( )) \ line → do
( ) ← takeMVar lock
withSGRs [SetColor Foreground Vivid colour] $ print $ "Output line: " <> line
putMVar lock ( )
contrafork :: OutputStream a → OutputStream b → IO (OutputStream (a, b), Async ( ))
contrafork ω₁ ω₂ = do
buffer ← newEmptyMVar
ω ← ω₁ & contramapM (\ (x, y) → (putMVar buffer . Just $ y) >> return x) >>= atEndOfOutput (putMVar buffer Nothing)
α ← makeInputStream (takeMVar buffer)
token ← async $ connect α ω₂
return (ω, token)
withSGRs :: [SGR] → IO a → IO a
withSGRs sgrs action = bracket open close \ _ → action
where
open = setSGR sgrs
close _ = setSGR [Reset]