如何在 GCP 发布/订阅应用程序中处理 nack 按摩?

问题描述 投票:0回答:0

我正在编写一个 gcp pub/sub 应用程序,但我无法正确

nack()
一条消息。您能否查看以下代码并提供帮助 -

    @Component
    public class PubSubEventConsumer {

    @Autowired
    private EmployeeService service;

    @Autowired
    EmployerService employer;

    @ServiceActivator(inputChannel = "pubSubInputChannel")
    public void messageReceiver(final Message<String> message) {
    try {
        String msg = message.getPayload();
        String[] msgArray = msg.split("_");
        service.getUpdatedEmpList(Integer.valueOf(Integer.parseInt(msgArray[2])));
        //if the above call doesnot return any exception then acknowledge
        ackMessage(message);

    } catch (CustomAPIException customException) {
    //if above call return any error start with 4** then catch and acknowledge 
        if(customException.getResponseEntity().getStatusCode().is4xxClientError()) {
            ackMessage(message);
        }
    } catch (Exception e) {
    //for anyother exception nack it
        nackMessage(message);
    }
    }

   private BasicAcknowledgeablePubsubMessage extractAcknowledgableMessage(Message<String> message) {
    return message.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE,
            BasicAcknowledgeablePubsubMessage.class);
}

private void ackMessage(Message<String> message) {
    BasicAcknowledgeablePubsubMessage originalMessage = extractAcknowledgableMessage(message);
    if (originalMessage != null) {
        originalMessage.ack();
    }
}

private void nackMessage(Message<String> message) {
    BasicAcknowledgeablePubsubMessage originalMessage = extractAcknowledgableMessage(message);
    if (originalMessage == null) {
        log.error("Could not get ORIGINAL_MESSAGE header. Failed to acknowledge message");
        return;
    }
    originalMessage.nack();
}

}

下面是

inputChannelAdpter

  @Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
        @Qualifier("pubsubInputChannel") MessageChannel pubsubFlagInputChannel,
        @Qualifier("pubSubTemplate") PubSubTemplate pubSubTemplate) {
 
    pubSubTemplate.setMessageConverter(new SimplePubSubMessageConverter());
    PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, subscription);
    adapter.setOutputChannel(pubsubFlagInputChannel);
    adapter.setPayloadType(String.class);
    adapter.setAckMode(AckMode.MANUAL);
    return adapter;
}

当下面的电话

service.getUpdatedEmpList(Integer.valueOf(Integer.parseInt(msgArray[2])));
收到除 4** 消息以外的任何异常,我无法控制它。同样的异常一次又一次地出现。

如果第一个事件抛出异常,那么下一个事件就不会被处理。失败事件的异常在日志中不断滚动。

请帮助我理解我是否可以处理

FlowControlSetting

有人可以帮忙吗,我怎样才能控制像三四颈后一样被破坏的消息,只需忽略当前消息并继续处理新消息。

java spring google-cloud-platform publish-subscribe google-cloud-pubsub
© www.soinside.com 2019 - 2024. All rights reserved.