我正在用 Python 构建一个应用程序,该应用程序通过 WebSockets 处理流连接上的音频数据。为了使用它,我需要在服务器上使用 ffmpeg 对其进行处理,然后再将其传递给一些 ML 算法。
我有以下 ffmpeg 代码设置来处理来自 WebSocket 的每个字节序列:
async def ffmpeg_read(bpayload: bytes, sampling_rate: int = 16000) -> np.array:
ar = f"{sampling_rate}"
ac = "1"
format_for_conversion = "f32le"
ffmpeg_command = [
"ffmpeg",
"-i", "pipe:0",
"-ac", ac,
"-acodec", f"pcm_{format_for_conversion}",
"-ar", ar,
"-f", format_for_conversion,
"pipe:1"]
try:
process = await asyncio.create_subprocess_exec(
*ffmpeg_command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE)
process.stdin.write(bpayload)
await process.stdin.drain()
process.stdin.close()
out_bytes = await process.stdout.read(8000) # Read asynchronously
audio = np.frombuffer(out_bytes, np.float32)
if audio.shape[0] == 0:
raise ValueError("Malformed soundfile")
return audio
except FileNotFoundError:
raise ValueError(
"ffmpeg was not found but is required to load audio files from filename")
在每个测试中,这仅适用于一条消息并将所需的输出打印到屏幕上,但第二条消息得到以下结果:
[mp3 @ 0x1208051e0] Invalid frame size (352): Could not seek to 363.
[in#0 @ 0x600002214200] Error opening input: Invalid argument
Error opening input file pipe:0.
我该如何解决这个问题?
这里的问题是,如果 ffmpeg 仅运行一次,则希望字节序列完整,因此后续帧是无效的声音文件。解决方案,就像上面提到的 Kesh 一样,是不关闭 stdin。
使用 Python 的 asyncio 库,这是我想出的一个解决方案,并得到了在线文档、Huggingface 转换器库、Whisper 库和 GPT-4 的大力支持:
async def write_to_ffmpeg(input_queue: asyncio.Queue, ffmpeg_input):
while True:
data = await input_queue.get()
if data is None:
ffmpeg_input.close()
break
ffmpeg_input.write(data)
await ffmpeg_input.drain()
async def read_from_ffmpeg(ffmpeg_output):
while True:
output = ffmpeg_output.read(1024)
print(output)
async def process_websocket(input_queue: asyncio.Queue, sampling_rate=16000):
ar = f"{sampling_rate}"
ac = "1"
format_for_conversion = "f32le"
ffmpeg_command = [
"ffmpeg",
"-i", "pipe:0",
"-ac", ac,
"-acodec", f"pcm_{format_for_conversion}",
"-ar", ar,
"-f", format_for_conversion,
"pipe:1"]
ffmpeg_process = await asyncio.create_subprocess_exec(
*ffmpeg_command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
writer_task = asyncio.create_task(
write_to_ffmpeg(input_queue, ffmpeg_process.stdin))
reader_task = asyncio.create_task(
read_from_ffmpeg(ffmpeg_process.stdout))
try:
await writer_task
await reader_task
except Exception as e:
print(e)
await ffmpeg_process.kill()
finally:
# Wait for the FFmpeg process to terminate
await ffmpeg_process.wait()