当我在 Spring Boot 中使用服务主体时,向事件中心应用程序发送事件的反应方式出现错误
"amqp:unauthorized-access" error Description "Unauthorized access. 'Send' claim(s) are required to perform this operation. Resource: 'xxx' Sender link was never active. Closing endpoint states.
当我们使用服务主体时会出现此问题,但是当使用 ConnectionString 时,它工作正常。
代码是使用反应式的方式Flux下沉,很多供应商用于发送事件,例如:
public class EventProducer {
private static final Logger log = LoggerFactory.getLogger(EventProducer.class);
ExecutorService executor = Executors.newFixedThreadPool(5);
@Autowired
private Sinks.Many<Message<String>> many;
@Override
public void send(AppMessage message) {
log.info(" In EventProducer Send ");
Runnable eventTask = () -> {
try {
log.info("Going to add message {} to event hub.", message.getMessage().toRawString());
many.emitNext(MessageBuilder.withPayload(message.getMessage().toRawString()).build(),
Sinks.EmitFailureHandler.FAIL_FAST);
}catch (Exception e) {
e.printStackTrace();
}
};
executor.execute(eventTask);
}
}
@Configuration
public class EventProducerConfiguration {
private static final Logger log = LoggerFactory.getLogger(EventProducerConfiguration.class);
@Bean
public Sinks.Many<Message<String>> many() {
return Sinks.many().unicast().onBackpressureBuffer();
}
@Bean
public Supplier<Flux<Message<String>>> supply(Sinks.Many<Message<String>> many) {
return () -> many.asFlux()
.doOnNext(m -> log.info("Manually sending message {}", m))
.doOnError(t -> log.error("Error encountered", t));
}
}
以下是属性:
cloud:
azure:
credential:
client-id: xxx
client-secret: xxx
profile:
tenant-id: xxx
eventhubs:
namespace: xxx
stream:
bindings:
supply-out-0:
destination: xxx
function:
definition: supply
poller:
initial-delay: 0
fixed-delay: 1000
注意:角色
AzureEventHub Sender
、AzureEventHub Owner
、AzureEventHub Receiver
正在使用连接字符串。这个问题只与角色service principal
有关。
从错误看来,服务原则没有必要的权限。但是您提到了已经赋予的角色和权限。如果添加了
Azure Event Hubs Data sender
和 Azure Event Hubs Data receiver
角色,您还可以再检查一下吗?
Microsoft 文档下方有要遵循的步骤。如果设置一切正常,请交叉验证。
https://github.com/MicrosoftDocs/azure-docs/blob/main/articles/azure-web-pubsub/howto-develop-event-listener.md