问题 Haskell中的管道和回调


我正在使用portaudio处理一些音频。只要有要处理的音频数据,haskell FFI绑定就会调用用户定义的回调。应该非常快速地处理这个回调,理想情况下没有I / O.我想保存音频输入并快速返回,因为我的应用程序不需要实时响应音频(现在我只是将音频数据保存到文件中;稍后我将构建一个简单的语音识别系统) 。

我喜欢这个主意 pipes 并认为我可以使用该库。问题是我不知道如何创建 Producer 返回通过回调传入的数据。

我该如何处理我的用例?


以下是我现在正在使用的内容,如果有帮助的话(基准mvar现在不能正常工作,但我不喜欢将所有数据存储在seq中...我宁愿处理它而不是它来了刚刚结束):

{-# LANGUAGE FlexibleInstances, MultiParamTypeClasses #-}

module Main where

import Codec.Wav

import Sound.PortAudio
import Sound.PortAudio.Base
import Sound.PortAudio.Buffer

import Foreign.Ptr
import Foreign.ForeignPtr
import Foreign.C.Types
import Foreign.Storable

import qualified Data.StorableVector as SV
import qualified Data.StorableVector.Base as SVB

import Control.Exception.Base (evaluate)

import Data.Int
import Data.Sequence as Seq

import Control.Concurrent

instance Buffer SV.Vector a where
  fromForeignPtr fp = return . SVB.fromForeignPtr fp
  toForeignPtr = return . (\(a, b, c) -> (a, c)) . SVB.toForeignPtr

-- | Wrap a buffer callback into the generic stream callback type.
buffCBtoRawCB' :: (StreamFormat input, StreamFormat output, Buffer a input, Buffer b output) =>
    BuffStreamCallback input output a b -> StreamCallback input output    
buffCBtoRawCB' func = \a b c d e -> do
    fpA <- newForeignPtr_ d -- We will not free, as callback system will do that for us   
    fpB <- newForeignPtr_ e -- We will not free, as callback system will do that for us
    storeInp <- fromForeignPtr fpA (fromIntegral $ 1 * c)
    storeOut <- fromForeignPtr fpB (fromIntegral $ 0 * c)
    func a b c storeInp storeOut

callback :: MVar (Seq.Seq [Int32]) -> PaStreamCallbackTimeInfo -> [StreamCallbackFlag] -> CULong 
            -> SV.Vector Int32 -> SV.Vector Int32 -> IO StreamResult
callback seqmvar = \timeinfo flags numsamples input output -> do
  putStrLn $ "timeinfo: " ++ show timeinfo ++ "; flags are " ++ show flags ++ " in callback with " ++ show numsamples ++ " samples."  
  print input
  -- write data to output
  --mapM_ (uncurry $ pokeElemOff output) $ zip (map fromIntegral [0..(numsamples-1)]) datum
  --print "wrote data"

  input' <- evaluate $ SV.unpack input  
  modifyMVar_ seqmvar (\s -> return $ s Seq.|> input')

  case flags of
    [] -> return $ if unPaTime (outputBufferDacTime timeinfo) > 0.2 then Complete else Continue
    _ -> return Complete

done doneMVar = do
  putStrLn "total done dood!"
  putMVar doneMVar True
  return ()

main = do

  let samplerate = 16000

  Nothing <- initialize

  print "initialized"

  m <- newEmptyMVar
  datum <- newMVar Seq.empty

  Right s <- openDefaultStream 1 0 samplerate Nothing (Just $ buffCBtoRawCB' (callback datum)) (Just $ done m)
  startStream s

  _ <- takeMVar m -- wait until our callbacks decide they are done!
  Nothing <- terminate

  print "let's see what we've recorded..."

  stuff <- takeMVar datum
  print stuff

  -- write out wav file

  -- let datum = 
  --       audio = Audio { sampleRate = samplerate
  --                   , channelNumber = 1
  --                   , sampleData = datum
  --                   }
  -- exportFile "foo.wav" audio

  print "main done"

3404
2018-02-02 02:33


起源

你能举一些例子来说明没有管道从回叫中获取数据吗? - Davorak
也许考虑使用渠道而不是 MVars序列。他们非常适合这类生产者 - 消费者问题。 - sabauma


答案:


最简单的解决方案是使用 MVars在回调和之间进行通信 Producer。就是这样:

import Control.Proxy
import Control.Concurrent.MVar

fromMVar :: (Proxy p) => MVar (Maybe a) -> () -> Producer p a IO ()
fromMVar mvar () = runIdentityP loop where
    loop = do
        ma <- lift $ takeMVar mvar
        case ma of
            Nothing -> return ()
            Just a  -> do
                respond a
                loop

您的流回调将写入 Just input 到了 MVar 你的终结回调会写 Nothing 终止 Producer

这是一个 ghci 示例演示它是如何工作的:

>>> mvar <- newEmptyMVar :: IO (MVar (Maybe Int))
>>> forkIO $ runProxy $ fromMVar mvar >-> printD
>>> putMVar mvar (Just 1)
1
>>> putMVar mvar (Just 2)
2
>>> putMVar mvar Nothing
>>> putMVar mvar (Just 3)
>>>

编辑: pipes-concurrency 图书馆 现在提供这个功能,它甚至有一个 本教程中的部分 具体解释如何使用它来获取回调数据。


13
2018-02-02 07:59



你可以轻松使用 Chan 相反,如果你想交错执行而不是阻塞 putMVar 当消费者没有到处时 take还有最新的价值。成本是您可能会使用更多内存。 - Dan Burton
那就对了。这取决于您是否希望回调与生产者同步。 - Gabriel Gonzalez