我想使用
max9814
和raspberry pi pico
与micropython/circuitpython
来录制音频,但是当我录制数据时,它的某些部分丢失了,因为它试图读取数据并将数据写入.wav文件,但它不能同时处理它。
电路如下:
RPI 皮科 | MAX9814 |
---|---|
GP26 | 出 |
3v3 | 电源电压 |
接地 | 接地 |
我的代码如下:
import board
import analogio
import time
import adafruit_wave
import struct
import storage
# storage.remount("/", readonly=False)
adc = analogio.AnalogIn(board.A0)
conversion_factor = 3.3/4096
buffer = []
f = adafruit_wave.open("audio.wav", "w")
f.setnchannels(1)
f.setsampwidth(2)
f.setframerate(8000)
def save(data):
global f
f.writeframes(bytearray(data))
print("done")
try:
while True:
sample = adc.value * conversion_factor
frame = bytearray(struct.pack('>H', int(sample)))
buffer.extend(frame)
if len(buffer) > 16000:
save(buffer)
buffer = []
print("clear buffer")
except KeyboardInterrupt:
print("closing")
f.close()
我该如何处理?
好吧,事情是这样的:您的代码试图同时执行太多操作,而 Pico 无法跟上。诀窍是将录音和保存部分分开,这样它们就不会互相干扰。您将使用线程来处理这个问题。 Pico 上的 Micropython 不像完整的 Python 那样支持
threading
,但 CircuitPython 对 asyncio
做了类似的事情。这是使用 asyncio
来管理这两个任务而不互相干扰的快速修复:
首先,您必须确保您的 CircuitPython 固件是最新的,因为
asyncio
是最近的。
然后,像这样调整你的设置:
import board
import analogio
import time
import struct
import audiocore
import audiobusio
import os
import asyncio
# Initialize ADC on GP26
adc = analogio.AnalogIn(board.GP26)
conversion_factor = 3.3 / 65536 # Adjusted for CircuitPython ADC range
buffer = bytearray(16000) # Adjust the buffer size if needed
buffer_index = 0
# Setup for the WAV file
wav_filename = "audio.wav"
audio_settings = {"channels": 1, "bits": 16, "rate": 8000}
# This function records the audio
async def record_audio():
global buffer, buffer_index
while True:
if buffer_index < len(buffer) - 2:
sample = int(adc.value * conversion_factor * 32767) # Scale sample to 16-bit range
struct.pack_into(">H", buffer, buffer_index, sample)
buffer_index += 2
else:
await asyncio.sleep(0) # Yield control to save_audio
# This function saves the audio
async def save_audio():
global buffer, buffer_index
while True:
if buffer_index >= len(buffer) - 2:
with open(wav_filename, "ab") as f: # Append binary data
wav_header = audiocore.WaveFile(f, audio_settings)
f.write(buffer[:buffer_index])
buffer_index = 0 # Reset buffer index after saving
await asyncio.sleep(0) # Yield control to record_audio
# Main function to run both tasks
async def main():
record_task = asyncio.create_task(record_audio())
save_task = asyncio.create_task(save_audio())
await asyncio.gather(record_task, save_task)
# Run the main function
asyncio.run(main())
此代码设置异步函数,用于将音频录制到缓冲区,并在缓冲区满后将该缓冲区保存到文件中。然后它会同时运行这两项任务,这样一个任务可以收集数据,而另一个任务则可以写入磁盘,从而防止您遇到损失。
asyncio.sleep(0)
在任务之间调用yield执行,允许两个任务继续执行而不会互相阻塞。