通过 websockets 流式传输音频并使用 ffmpeg 进行处理,帧大小无效错误

问题描述 投票:0回答:1

我正在用 Python 构建一个应用程序,该应用程序通过 WebSockets 处理流连接上的音频数据。为了使用它,我需要在服务器上使用 ffmpeg 对其进行处理,然后再将其传递给一些 ML 算法。

我有以下 ffmpeg 代码设置来处理来自 WebSocket 的每个字节序列:

async def ffmpeg_read(bpayload: bytes, sampling_rate: int = 16000) -> np.array:
    ar = f"{sampling_rate}"
    ac = "1"
    format_for_conversion = "f32le"
    ffmpeg_command = [
        "ffmpeg",
        "-i", "pipe:0",
        "-ac", ac,
        "-acodec", f"pcm_{format_for_conversion}",
        "-ar", ar,
        "-f", format_for_conversion,
        "pipe:1"]
    try:
        process = await asyncio.create_subprocess_exec(
            *ffmpeg_command,
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE)

        process.stdin.write(bpayload)
        await process.stdin.drain() 
        process.stdin.close()

        out_bytes = await process.stdout.read(8000)  # Read asynchronously
        audio = np.frombuffer(out_bytes, np.float32)

        if audio.shape[0] == 0:
            raise ValueError("Malformed soundfile")
        return audio

    except FileNotFoundError:
        raise ValueError(
            "ffmpeg was not found but is required to load audio files from filename")

在每个测试中,这仅适用于一条消息并将所需的输出打印到屏幕上,但第二条消息得到以下结果:

[mp3 @ 0x1208051e0] Invalid frame size (352): Could not seek to 363.
[in#0 @ 0x600002214200] Error opening input: Invalid argument
Error opening input file pipe:0.

我该如何解决这个问题?

python ffmpeg subprocess ffmpeg-python
1个回答
0
投票

这里的问题是,如果 ffmpeg 仅运行一次,则希望字节序列完整,因此后续帧是无效的声音文件。解决方案,就像上面提到的 Kesh 一样,是不关闭 stdin。

使用 Python 的 asyncio 库,这是我想出的一个解决方案,并得到了在线文档、Huggingface 转换器库、Whisper 库和 GPT-4 的大力支持:

async def write_to_ffmpeg(input_queue: asyncio.Queue, ffmpeg_input):
    while True:
        data = await input_queue.get()
        if data is None:
        ffmpeg_input.close()
            break
        ffmpeg_input.write(data)  
        await ffmpeg_input.drain() 
   
async def read_from_ffmpeg(ffmpeg_output):
    while True:
        output = ffmpeg_output.read(1024)
        print(output)
 

async def process_websocket(input_queue: asyncio.Queue, sampling_rate=16000):
    ar = f"{sampling_rate}"
    ac = "1"
    format_for_conversion = "f32le"
    ffmpeg_command = [
        "ffmpeg",
        "-i", "pipe:0",
        "-ac", ac,
        "-acodec", f"pcm_{format_for_conversion}",
        "-ar", ar,
        "-f", format_for_conversion,
        "pipe:1"]
    ffmpeg_process = await asyncio.create_subprocess_exec(
        *ffmpeg_command,
        stdin=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )

    writer_task = asyncio.create_task(
        write_to_ffmpeg(input_queue, ffmpeg_process.stdin))
    reader_task = asyncio.create_task(
        read_from_ffmpeg(ffmpeg_process.stdout))

    try:
        await writer_task
        await reader_task
    except Exception as e:
        print(e)
        await ffmpeg_process.kill()
    finally:
        # Wait for the FFmpeg process to terminate
        await ffmpeg_process.wait()
© www.soinside.com 2019 - 2024. All rights reserved.