400 音频超时错误:长时间没有音频。音频应接近实时发送

问题描述 投票:0回答:1

我正在尝试使用流式音频输入的谷歌语音转文本功能。

我有一个简单的 JS 代码,可以在按下按钮时录制音频,并使用 websockets 将音频发送到 fastapi 后端。在 fastapi 后端中,我使用 webscoket 接收音频字节并将其存储在块中并将它们发送到 google 语音到文本流识别 api 并收到以下错误。

我还通过保存到文件中验证了传入的音频字节,当我播放保存的音频文件时,它正在工作

错误

Traceback (most recent call last):
  File "/Users/swastikn/Desktop/speech_to_text/venv/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 112, in __next__
    return next(self._wrapped)
  File "/Users/swastikn/Desktop/speech_to_text/venv/lib/python3.7/site-packages/grpc/_channel.py", line 426, in __next__
    return self._next()
  File "/Users/swastikn/Desktop/speech_to_text/venv/lib/python3.7/site-packages/grpc/_channel.py", line 809, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.OUT_OF_RANGE
    details = "Audio Timeout Error: Long duration elapsed without audio. Audio should be sent close to real time."
    debug_error_string = "{"created":"@1675100066.815164000","description":"Error received from peer ipv4:142.250.67.42:443","file":"src/core/lib/surface/call.cc","file_line":1075,"grpc_message":"Audio Timeout Error: Long duration elapsed without audio. Audio should be sent close to real time.","grpc_status":11}"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/swastikn/Desktop/speech_to_text/sst/temp.py", line 199, in stream_transcript
    listen_print_loop(responses)
  File "/Users/swastikn/Desktop/speech_to_text/sst/temp.py", line 58, in listen_print_loop
    for response in responses:
  File "/Users/swastikn/Desktop/speech_to_text/venv/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 115, in __next__
    raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.OutOfRange: 400 Audio Timeout Error: Long duration elapsed without audio. Audio should be sent close to real time.

代码

import base64
import json
import os
import re
import signal
import sys
import threading
import time
import traceback
from typing import List

import uvicorn
from fastapi import FastAPI, WebSocket
from google.cloud import speech_v1p1beta1 as speech
from six.moves import queue
from starlette.websockets import WebSocketDisconnect

app = FastAPI()
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "creds.json"


class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)


def signal_handler(sig, frame):
    sys.exit(0)


def listen_print_loop(responses):
    """Iterates through server responses and prints them.
    The responses passed is a generator that will block until a response
    is provided by the server.
    Each response may contain multiple results, and each result may contain
    multiple alternatives; for details, see ...  Here we
    print only the transcription for the top alternative of the top result.
    In this case, responses are provided for interim results as well. If the
    response is an interim one, print a line feed at the end of it, to allow
    the next result to overwrite it, until the response is a final one. For the
    final one, print a newline to preserve the finalized transcription.
    """
    num_chars_printed = 0
    for response in responses:
        if not response.results:
            continue

        # The `results` list is consecutive. For streaming, we only care about
        # the first result being considered, since once it's `is_final`, it
        # moves on to considering the next utterance.
        result = response.results[0]
        if not result.alternatives:
            continue

        # Display the transcription of the top alternative.
        transcript = result.alternatives[0].transcript

        # Display interim results, but with a carriage return at the end of the
        # line, so subsequent lines will overwrite them.
        #
        # If the previous result was longer than this one, we need to print
        # some extra spaces to overwrite the previous result
        overwrite_chars = " " * (num_chars_printed - len(transcript))

        if not result.is_final:
            sys.stdout.write(transcript + overwrite_chars + "\r")
            sys.stdout.flush()

            num_chars_printed = len(transcript)

        else:
            print(transcript + overwrite_chars)

            # Exit recognition if any of the transcribed phrases could be
            # one of our keywords.
            if re.search(r"\b(exit|quit)\b", transcript, re.I):
                print("Exiting..")
                break

            num_chars_printed = 0


class Stream(object):
    """Opens a recording stream as a generator yielding the audio chunks."""

    def __init__(self, rate, chunk):
        self._rate = rate
        self._chunk = chunk

        # Create a thread-safe buffer of audio data
        self.buff = queue.Queue()
        self.closed = True

    def __enter__(self):
        self.closed = False

        return self

    def __exit__(self, type, value, traceback):
        self.closed = True
        # Signal the generator to terminate so that the client's
        # streaming_recognize method will not block the process termination.
        self.buff.put(None)

    def fill_buffer(self, in_data):
        """Continuously collect data from the audio stream, into the buffer."""
        self.buff.put(in_data)
        return self

    def generator(self):
        while True:
            # Use a blocking get() to ensure there's at least one chunk of
            # data, and stop iteration if the chunk is None, indicating the
            # end of the audio stream.
            chunk = self.buff.get()
            if chunk is None:
                return
            data = [chunk]

            # Now consume whatever other data's still buffered.
            while True:
                try:
                    chunk = self.buff.get(block=False)
                    if chunk is None:
                        return
                    data.append(chunk)
                except queue.Empty:
                    break

            yield b"".join(data)


@app.websocket('/stt')
async def stt(ws: WebSocket):
    await manager.connect(ws)
    print("Connection accepted")
    # A lot of messages will be sent rapidly. We'll stop showing after the first one.
    message_count = 0
    run = True
    while run:
        try:
            message = await ws.receive_text()
            if message is None:
                print("No message received...")
                continue

            # Messages are a JSON encoded string
            data = json.loads(message)

            # Using the event type you can determine what type of message you are receiving
            if data['event'] == "connected":
                print("Connected Message received: {}".format(message))
            if data['event'] == "start":
                print("Start Message received: {}".format(message))
            if data['event'] == "media":
                payload = data['media']['payload']
                chunk = base64.b64decode(payload)
                
                ## just to verify that we are getting proper audio I'm saving audio bytes as audio file
                with open("/Users/swastikn/Desktop/song.webm", mode="bx") as f:
                    f.write(chunk)
                 ## and I was able to play the audio file using saved file
                    
                stream.fill_buffer(chunk)
            if data['event'] == "mark":
                print("Mark Message received: {}".format(message))
            if data['event'] == "stop":
                print("Stop Message received: {}".format(message))
                break
            message_count += 1
        except WebSocketDisconnect as e:
            run = False
            print(e)

    print("Connection closed. Received a total of {} messages".format(message_count))


def stream_transcript():
    run = True
    while run:
        audio_generator = stream.generator()
        try:
            requests = (
                speech.StreamingRecognizeRequest(audio_content=content)
                for content in audio_generator
            )
            responses = client.streaming_recognize(streaming_config, requests)
            # Now, put the transcription responses to use.
            listen_print_loop(responses)
        except WebSocketDisconnect as e:
            run = False
            print(e)
        except Exception:
            traceback.print_exception(*sys.exc_info())

        time.sleep(5)


if __name__ == '__main__':
    # Audio recording parameters
    RATE = 8000
    CHUNK = int(RATE / 100)  # 100ms
    language_code = "en-IN"  # a BCP-47 language tag

    client = speech.SpeechClient()
    config = speech.RecognitionConfig(
        encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
        sample_rate_hertz=RATE,
        language_code=language_code,
        enable_speaker_diarization=True,
    )

    streaming_config = speech.StreamingRecognitionConfig(
        config=config, interim_results=True
    )
    manager = ConnectionManager()
    stream = Stream(RATE, CHUNK)
    t1 = threading.Thread(target=stream_transcript)
    t1.daemon = True
    t1.start()

    signal.signal(signal.SIGINT, signal_handler)

    uvicorn.run(app=app)

录制音频的JS代码

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>IVR</title>
</head>
<body>
    <button id="start-recording">Start Recording</button>
    <button id="stop-recording" disabled>Stop Recording</button>
</body>
  <script src="app.js"></script>
</html>

---------

const recordButton = document.getElementById("start-recording");
const stopButton = document.getElementById("stop-recording");

let mediaRecorder;
let chunks = [];

const handleSuccess = (stream) => {
  mediaRecorder = new MediaRecorder(stream);

  mediaRecorder.start();
  console.log(mediaRecorder.state);
  console.log("recorder started");
  recordButton.disabled = true;
  stopButton.disabled = false;

  mediaRecorder.ondataavailable = (e) => {
    chunks.push(e.data);
  };

  mediaRecorder.onstop = (e) => {
    const audioBlob = new Blob(chunks, { type: "audio/wav" });
    chunks = [];
    sendDataToWebsocket(audioBlob);
  };
};

const handleError = (error) => {
  console.log("navigator.getUserMedia error: ", error);
};

recordButton.addEventListener("click", () => {
  navigator.mediaDevices
    .getUserMedia({ audio: true })
    .then(handleSuccess)
    .catch(handleError);
});

stopButton.addEventListener("click", () => {
  mediaRecorder.stop();
  recordButton.disabled = false;
  stopButton.disabled = true;
});

const sendDataToWebsocket = (audioBlob) => {
  const socket = new WebSocket("ws://localhost:8000/stt");
  socket.binaryType = "arraybuffer";

const reader = new FileReader();
var payload;
reader.readAsArrayBuffer(audioBlob);
reader.onloadend = function() {
  var binaryString = '';
  var bytes = new Uint8Array(reader.result);
  for (var i = 0; i < bytes.byteLength; i++) {
    binaryString += String.fromCharCode(bytes[i]);
  }
  payload = window.btoa(binaryString);
}

  socket.onopen = () => {

    data = {'event': 'media', 'stream_sid': 7, 'media': {'payload': payload}}
    console.log(data)
    console.log(JSON.stringify(data));
    socket.send(JSON.stringify(data));
    console.log("Data sent to websocket");
  };

  socket.onclose = (event) => {
    console.log("Websocket closed", event);
  };

  socket.onerror = (error) => {
    console.log("Websocket error", error);
  };
};

注意

  1. python 版本 -
    3.7
  2. 谷歌云核心==2.2.1
  3. google-api-core==2.11.0
  4. google-api-python-client==2.34.0
google-cloud-platform speech-recognition speech-to-text google-speech-api google-cloud-speech
1个回答
0
投票

如果有人遇到此问题或者您从未得到答案,请使用

audiooop
模块在发送传入数据包之前对其进行编码

import audioop

 if data['event'] == "media":
     audio = base64.b64decode(data['media']['payload'])
     audio = audioop.ulaw2lin(audio, 2)
     audio = audioop.ratecv(audio, 2, 1, 8000, 16000, None)[0]
     stream.fill_buffer(audio)
© www.soinside.com 2019 - 2024. All rights reserved.