是否可以合并两个OutputStream?

问题描述 投票:1回答:1

OutputStreamOutputStream的抽象-一种认为是延续的方法。特别是,与对应的looks like a contravariant functor不同,它不是容器-它不能为您提供任何实际值。

[InputStream]我希望反过来—合并It has been shown previously how to split an InputStream.。也就是说,我需要这样的功能:

InputStream

想到它的一种方法是从OutputStream开始。

到目前为止,我的尝试已经走了这么远:

contrafork :: OutputStream a → OutputStream b → IO (OutputStream (a, b))
contrafork = …

instance Applicative OutputStream-这是发生动作的地方。我必须使其异步,以避免«无限期阻塞线程]的情况,然后我必须在外部传递一个令牌,消费者应等待以确保两个流完全执行。在错误的位置等待仍会导致线程锁定。总而言之,这种解决方案还有待改进。

该怎么办?

这是我用来查看我的contrafork :: OutputStream a → OutputStream b → IO (OutputStream (a, b), Async ( )) contrafork ω₁ ω₂ = do buffer ← newEmptyMVar ω ← ω₁ & contramapM (\ (x, y) → (putMVar buffer . Just $ y) >> return x) >>= atEndOfOutput (putMVar buffer Nothing) α ← makeInputStream (takeMVar buffer) token ← async $ connect α ω₂ return (ω, token) 是否有效的跑步者:

connect is a loop under the hood

如果使其可执行并在控制台中运行,则应该看到一些交错的红线和蓝线。

haskell stream
1个回答
1
投票

[connect被定义为contrafork,因此可以将两个流合并如下:

#!/usr/bin/env stack
{- stack --resolver=lts-14 script
--package io-streams
--package bytestring
--package ansi-terminal
--package async
--ghc-options -Wall
-}

{-# language UnicodeSyntax #-}
{-# language OverloadedStrings #-}
{-# language BlockArguments #-}

import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception
import Data.Function
import System.Console.ANSI
import System.IO.Streams hiding (map)
import qualified System.IO.Streams as Streams
import qualified System.IO.Streams.ByteString as ByteString

main :: IO ( )
main = do
  α ← "1\n2\n3\n" & fromByteString >>= ByteString.lines
  consoleWriteLock ← newMVar ( )
  [ω₁, ω₂] ← traverse (makeOutputStream . logMaybeLineWithColour consoleWriteLock) [Red, Blue]
  (ω², token) ← contrafork ω₁ ω₂
  α² ← Streams.map (\ x → (x, x)) α
  connect α² ω²
  wait token

  where
    logMaybeLineWithColour lock colour = maybe (return ( )) \ line → do
      ( ) ← takeMVar lock
      withSGRs [SetColor Foreground Vivid colour] $ print $ "Output line: " <> line
      putMVar lock ( )

contrafork :: OutputStream a → OutputStream b → IO (OutputStream (a, b), Async ( ))
contrafork ω₁ ω₂ = do
  buffer ← newEmptyMVar
  ω ← ω₁ & contramapM (\ (x, y) → (putMVar buffer . Just $ y) >> return x) >>= atEndOfOutput (putMVar buffer Nothing)
  α ← makeInputStream (takeMVar buffer)
  token ← async $ connect α ω₂
  return (ω, token)

withSGRs :: [SGR] → IO a → IO a
withSGRs sgrs action = bracket open close \ _ → action
  where
    open = setSGR sgrs
    close _ = setSGR [Reset]
© www.soinside.com 2019 - 2024. All rights reserved.