我是处理并发的新手。我有一个发送和接收音频数据的网络套接字应用程序。与此同时,我有一个来自 Azure 的事件侦听器正在寻找要转录为文本的语音,还有其他一些线程正在寻找要合成为语音的文本。然而,当我说话时,响应会无限重复(?)。我不确定是否需要在这里使用同步以及在这种情况下我应该传递什么作为参数?或者还有其他事情需要做吗?以下是一些代码摘录。
TwilioMediaStreamsHandler 类:
private final Map<WebSocketSession, AzureSpeechToTextService> sessions = new ConcurrentHashMap<>();
static final Map<WebSocketSession, Integer> messageCounts = new ConcurrentHashMap<>();
static final Map<WebSocketSession, Boolean> hasSessionSeenMedia = new ConcurrentHashMap<>();
static final Integer repeatThreshold = 10;
ArrayList<String> mediaMessages = new ArrayList<String>();
private final ObjectMapper jsonMapper = new ObjectMapper();
private final Base64.Decoder base64Decoder = Base64.getDecoder();
private static final Logger LOGGER = LoggerFactory.getLogger(TwilioMediaStreamsHandler.class);
@Override
public void afterConnectionEstablished(WebSocketSession session) {
LOGGER.info("Connection Established");
sessions.put(session, new AzureSpeechToTextService());
}
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) throws IOException {
JsonNode request = jsonMapper.readTree(message.getPayload());
Boolean hasSeenMedia = hasSessionSeenMedia.getOrDefault(session, false);
if (request.path("media").path("track").asText().equals("inbound")) {
if (!hasSeenMedia) {
LOGGER.info("Media WS: Media message received: {}", message);
LOGGER.warn("Media WS: Additional messages from WebSocket are now being suppressed");
hasSessionSeenMedia.put(session, true);
}
mediaMessages.add(message.getPayload());
if (mediaMessages.size() >= repeatThreshold) {
repeat(session);
}
messageCounts.merge(session, 1, Integer::sum);
}
}
public void repeat(WebSocketSession session) throws IOException {
ArrayList<String> playMessages = (ArrayList<String>) mediaMessages.clone();
mediaMessages.clear();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
String streamSid = "";
byte[] decoded = null;
for (String playMessage : playMessages) {
JsonNode request = jsonMapper.readTree(playMessage);
streamSid = request.path("streamSid").asText();
String base64EncodedAudio = request.path("media").path("payload").asText();
decoded = base64Decoder.decode(base64EncodedAudio);
sessions.get(session).pushData(decoded);
decoded = sessions.get(session).getBytes();
}
String resultText = sessions.get(session).getResultText();
try {
decoded = new AzureTextToSpeechService().textToBytes(resultText);
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
if (decoded != null)
outputStream.write(decoded);
ObjectMapper objectMapper = new ObjectMapper();
byte[] encodedBytes = Base64.getEncoder().encode(outputStream.toByteArray());
String response = objectMapper.writeValueAsString(new OutBoundMessage("media", new Media(new String(encodedBytes)), streamSid));
session.sendMessage(new TextMessage(response));
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
LOGGER.info("Connection Closed");
}
}
AzureSpeechToTextService 类:
private static final String SPEECH_SUBSCRIPTION_KEY = System.getenv("AZURE_SPEECH_SUBSCRIPTION_KEY");
private static final String SERVICE_REGION = System.getenv("AZURE_SERVICE_REGION");
private static final Logger LOGGER = LoggerFactory.getLogger(AzureSpeechToTextService.class);
private final PushAudioInputStream azurePusher;
private String resultText;
public AzureSpeechToTextService() {
azurePusher = AudioInputStream.createPushStream(AudioStreamFormat.getWaveFormatPCM(8000L, (short) 16, (short) 1));
SourceLanguageConfig sourceLanguageConfig = SourceLanguageConfig.fromLanguage("sv-SE");
SpeechRecognizer speechRecognizer = new SpeechRecognizer(
SpeechConfig.fromSubscription(SPEECH_SUBSCRIPTION_KEY, SERVICE_REGION),
sourceLanguageConfig,
AudioConfig.fromStreamInput(azurePusher));
speechRecognizer.recognized.addEventListener((o, speechRecognitionEventArgs) -> {
SpeechRecognitionResult speechRecognitionResult = speechRecognitionEventArgs.getResult();
resultText = speechRecognitionResult.getText();
LOGGER.info("Recognized text from speech: {}", resultText);
});
speechRecognizer.startContinuousRecognitionAsync();
}
public String getResultText() {
return this.resultText;
}
public void pushData(byte[] mulawData) {
azurePusher.write(MulawToPcm.transcode(mulawData));
}
}
AzureTextToSpeechService 类:
private static final String SPEECH_SUBSCRIPTION_KEY = System.getenv("AZURE_SPEECH_SUBSCRIPTION_KEY");
private static final String SERVICE_REGION = System.getenv("AZURE_SERVICE_REGION");
private static final Logger LOGGER = LoggerFactory.getLogger(AzureTextToSpeechService.class);
public byte[] textToBytes(String text) throws ExecutionException, InterruptedException {
SpeechConfig speechConfig = SpeechConfig.fromSubscription(SPEECH_SUBSCRIPTION_KEY, SERVICE_REGION);
speechConfig.setSpeechSynthesisOutputFormat(SpeechSynthesisOutputFormat.Raw8Khz8BitMonoMULaw);
AutoDetectSourceLanguageConfig autoDetectSourceLanguageConfig = AutoDetectSourceLanguageConfig.fromOpenRange();
SpeechSynthesizer synthesizer = new SpeechSynthesizer(speechConfig, autoDetectSourceLanguageConfig, null);
SpeechSynthesisResult result = synthesizer.SpeakTextAsync(text).get();
byte[] audioData = null;
if (result.getReason() == ResultReason.SynthesizingAudioCompleted) {
LOGGER.info("Speech synthesized for: {}", text);
audioData = result.getAudioData();
LOGGER.info("{} bytes recieved", audioData.length);
} else if (result.getReason() == ResultReason.Canceled) {
SpeechSynthesisCancellationDetails cancellation = SpeechSynthesisCancellationDetails.fromResult(result);
System.out.println("CANCELED: Reason=" + cancellation.getReason());
if (cancellation.getReason() == CancellationReason.Error) {
System.out.println("CANCELED: ErrorCode=" + cancellation.getErrorCode());
System.out.println("CANCELED: ErrorDetails=" + cancellation.getErrorDetails());
System.out.println("CANCELED: Did you update the subscription info?");
}
}
result.close();
synthesizer.close();
return audioData;
}
}
此场景中建议的操作有两个:
https://azure.microsoft.com/en-in/blog/managing-concurrency-in-microsoft-azure-storage-2/
有助于理解并发的实现。