Twilio TwiML Connect 双向同步流

问题描述 投票:0回答:1

我正在尝试从我的 Twilio 帐户/号码向 PSTN 手机号码拨打电话。我正在使用这个 TwiML:
有了这个,我就可以接收和播放来自 PSTN 手机的传入音频 我无法将传出音频发送到 PSTN 手机。

这是主要的 websocket 函数,它处理传入的音频并尝试 通过函数“sendremotemediatotwilio”发送传出数据包作为响应:

    async def websocket_rpc_endpoint(ws: WebSocket):
        # A lot of messages will be sent rapidly. We'll stop showing after the first one.
        global output_queue
        inbuffer = bytearray(b'')
        inbound_chunks_started = False
        latest_inbound_timestamp = 0
        BUFFER_SIZE = frames_per_buffer
        await ws.accept()
        while True:
            data = await ws.receive_json()
               
            # Using the event type you can determine what type of message you are receiving
            if data['event'] == "connected":
                print("Connected Message received: {}".format(data))
            if data['event'] == "start":
                print("Start Message received: {}".format(data))
            if data['event'] == "media":
                # print("Media message: {}".format(data))
                media = data['media']
                chunk = base64.b64decode(media['payload'])
                # print("Payload is: {}".format(payload))
                if media['track'] == 'inbound':
                    # fills in silence if there have been dropped packets
                    if inbound_chunks_started:
                        if latest_inbound_timestamp + 20 < int(media['timestamp']):
                            bytes_to_fill = 8 * (int(media['timestamp']) - (latest_inbound_timestamp + 20))
                            # NOTE: 0xff is silence for mulaw audio
                            # and there are 8 bytes per ms of data for our format (8 bit, 8000 Hz)
                            inbuffer.extend(b'\xff' * bytes_to_fill)
                    else:
                        # make it known that inbound chunks have started arriving
                        inbound_chunks_started = True
                        latest_inbound_timestamp = int(media['timestamp'])
                    latest_inbound_timestamp = int(media['timestamp'])
                    # extend the inbound audio buffer with data
                    inbuffer.extend(chunk)
                while len(inbuffer) >= BUFFER_SIZE:
                    asinbound = AudioSegment(inbuffer[:BUFFER_SIZE], sample_width=1, frame_rate=8000, channels=1)
                # # print("That's {} bytes".format(len(chunk)))
                # print("That's {} bytes".format(len(chunk)))
                # print("Additional media messages from WebSocket are being suppressed....")
                    output_queue.put_nowait(base64.b64encode(asinbound.raw_data))
                    inbuffer = inbuffer[BUFFER_SIZE:]
                await sendremotemediatotwilio(ws,media,data['streamSid'])
                # print("Added data in queue")
            if data['event'] == "closed":
                print("Closed Message received: {}".format(data))
                break
    async def sendremotemediatotwilio(ws,media,streamSid):
        global input_queue
        # if input_queue.empty():
        #     await asyncio.sleep(1)
        if not input_queue.empty():
            base64_data = input_queue.get_nowait()
            media_data = {
                "event": "media",
                "streamSid": streamSid,
                "media": {
                    "payload": base64.b64encode(base64_data).decode('utf-8')
                }
            }
            # media = json.dumps(media_data)
            # print(f"media: {media}")
            print("sending json")
            await ws.send_json(media_data)
        else:
            # print("Queue is empty")
            pass
twilio twilio-twiml bidirectional
1个回答
1
投票

我已经解决了这个问题。现在我可以成功地向 PSTN 手机发送数据包和从 PSTN 手机接收数据包。问题在于从 pcmu 转码到线性 PCM。一旦解决,相同的代码就可以正常工作了

@app.websocket("/twiliomedia")
async def websocket_rpc_endpoint(ws: WebSocket):
    # A lot of messages will be sent rapidly. We'll stop showing after the first one.
    global output_queue,pushdatainqueue
    inbuffer = bytearray(b'')
    inbound_chunks_started = False
    latest_inbound_timestamp = 0
    BUFFER_SIZE = frames_per_buffer
    await ws.accept()
    while True:
        data = await ws.receive_json()
        # print("Json data",data)
        # if message is None:
        #     print("No message received...")
        #     continue
        #
        # # Messages are a JSON encoded string
        # data = json.loads(message)

        # Using the event type you can determine what type of message you are receiving
        if data['event'] == "connected":
            print("Connected Message received: {}".format(data))
        if data['event'] == "start":
            print("Start Message received: {}".format(data))
        if data['event'] == "media":
            # print("Media message: {}".format(data))
            media = data['media']
            chunk = base64.b64decode(media['payload'])
            # print("Payload is: {}".format(payload))
            if media['track'] == 'inbound':
                # fills in silence if there have been dropped packets
                if inbound_chunks_started:
                    if latest_inbound_timestamp + 20 < int(media['timestamp']):
                        bytes_to_fill = 8 * (int(media['timestamp']) - (latest_inbound_timestamp + 20))
                        # NOTE: 0xff is silence for mulaw audio
                        # and there are 8 bytes per ms of data for our format (8 bit, 8000 Hz)
                        inbuffer.extend(b'\xff' * bytes_to_fill)
                else:
                    # make it known that inbound chunks have started arriving
                    inbound_chunks_started = True
                    pushdatainqueue = True
                    latest_inbound_timestamp = int(media['timestamp'])
                latest_inbound_timestamp = int(media['timestamp'])
                # extend the inbound audio buffer with data
                inbuffer.extend(chunk)
            while len(inbuffer) >= BUFFER_SIZE:
                asinbound = AudioSegment(inbuffer[:BUFFER_SIZE], sample_width=1, frame_rate=8000, channels=1)
                # # print("That's {} bytes".format(len(chunk)))
                # print("That's {} bytes".format(len(chunk)))
                # print("Additional media messages from WebSocket are being suppressed....")
                output_queue.put_nowait(base64.b64encode(asinbound.raw_data))
                inbuffer = inbuffer[BUFFER_SIZE:]
            await sendremotemediatotwilio(ws, media, data['streamSid'])
            # print("Added data in queue")
        if data['event'] == "closed":
            print("Closed Message received: {}".format(data))
            break


async def sendremotemediatotwilio(ws, media, streamSid):
    global input_queue
    # if input_queue.empty():
    #     await asyncio.sleep(1)
    if not input_queue.empty():
        base64_data = input_queue.get_nowait()
        media_data = {
            "event": "media",
            "streamSid": streamSid,
            "media": {
                "payload": base64.b64encode(base64_data).decode("utf-8")
                # "payload": base64_data.decode("utf-8")
            }
        }
        # media = json.dumps(media_data)
        # print(f"media: {media}")
        print("sending json")
        await ws.send_json(media_data)
    else:
        # print("Queue is empty")
        pass
© www.soinside.com 2019 - 2024. All rights reserved.