我正在编写一个 gcp pub/sub 应用程序,但我无法正确
nack()
一条消息。您能否查看以下代码并提供帮助 -
@Component
public class PubSubEventConsumer {
@Autowired
private EmployeeService service;
@Autowired
EmployerService employer;
@ServiceActivator(inputChannel = "pubSubInputChannel")
public void messageReceiver(final Message<String> message) {
try {
String msg = message.getPayload();
String[] msgArray = msg.split("_");
service.getUpdatedEmpList(Integer.valueOf(Integer.parseInt(msgArray[2])));
//if the above call doesnot return any exception then acknowledge
ackMessage(message);
} catch (CustomAPIException customException) {
//if above call return any error start with 4** then catch and acknowledge
if(customException.getResponseEntity().getStatusCode().is4xxClientError()) {
ackMessage(message);
}
} catch (Exception e) {
//for anyother exception nack it
nackMessage(message);
}
}
private BasicAcknowledgeablePubsubMessage extractAcknowledgableMessage(Message<String> message) {
return message.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE,
BasicAcknowledgeablePubsubMessage.class);
}
private void ackMessage(Message<String> message) {
BasicAcknowledgeablePubsubMessage originalMessage = extractAcknowledgableMessage(message);
if (originalMessage != null) {
originalMessage.ack();
}
}
private void nackMessage(Message<String> message) {
BasicAcknowledgeablePubsubMessage originalMessage = extractAcknowledgableMessage(message);
if (originalMessage == null) {
log.error("Could not get ORIGINAL_MESSAGE header. Failed to acknowledge message");
return;
}
originalMessage.nack();
}
}
下面是
inputChannelAdpter
@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
@Qualifier("pubsubInputChannel") MessageChannel pubsubFlagInputChannel,
@Qualifier("pubSubTemplate") PubSubTemplate pubSubTemplate) {
pubSubTemplate.setMessageConverter(new SimplePubSubMessageConverter());
PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, subscription);
adapter.setOutputChannel(pubsubFlagInputChannel);
adapter.setPayloadType(String.class);
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}
当下面的电话
service.getUpdatedEmpList(Integer.valueOf(Integer.parseInt(msgArray[2])));
收到除 4** 消息以外的任何异常,我无法控制它。同样的异常一次又一次地出现。
如果第一个事件抛出异常,那么下一个事件就不会被处理。失败事件的异常在日志中不断滚动。
请帮助我理解我是否可以处理
FlowControlSetting
有人可以帮忙吗,我怎样才能控制像三四颈后一样被破坏的消息,只需忽略当前消息并继续处理新消息。