Spring Cloud PubSub无法发布到队列中

问题描述 投票:3回答:1

我有一个正在从一个队列侦听并发布到多个队列的实现。因此,我实现了一个通用接口,以通过传递其名称将消息发送到不同的主题。

假设:

  • 我正在使用Google Cloud Pubsub
  • spring-cloud-gcp-dependencies版本:1.1.3发布
  • spring-cloud-dependencies版本:Greenwich.RELEASE
  • spring-integration-core版本:5.1.7.RELEASE
  • spring-cloud-gcp-starter-pubsub版本:1.1.3。发布

我在做什么?我正在侦听队列并执行一些逻辑,之后我将ack消息手动发送到同一队列(可以在下面的方法中看到consumer.ack()),另外两条消息发送到其他队列(电子邮件触发队列和异步支持其他微服务)。我做错了我无法设法解决的方法,我进行了很多搜索并尝试了不同的方法。

弹簧特性

spring.cloud.gcp:
  project-id: theProjectId
  credentials:
    location: XXX.json
    scopes: DEFAULT_SCOPES
  pubsub:
    enabled: true
    publisher:
      retry:
        total-timeout-seconds: 120
        initial-retry-delay-second: 5
    subscriber:
      executor-threads: 1
      initial-retry-delay-seconds: 20

StackTrace:

org.springframework.cloud.gcp.pubsub.core.PubSubException: Sending Spring message failed.; nested exception is org.springframework.messaging.MessageHandlingException: nested exception is java.lang.IllegalArgumentException, failedMessage=GenericMessage [payload=byte[105], headers={gcp_pubsub_acknowledgement=org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter$1@37a56117, id=2141149b-b1fd-6f35-55a8-XXXXXXXX, gcp_pubsub_topic=xxx-dev, gcp_pubsub_original_message=PushedAcknowledgeablePubsubMessage{projectId='theprojectId', subscriptionName='xxx-dev', message=data: "{\"eventId\":22,\"serviceName\":\"nameService\",\"eventName\":\"VISIT_EVENT\",\"payload\":null,\"accountId\":3}"
attributes {
  key: "gcp_pubsub_topic"
  value: "xxx-dev"
}
message_id: "8542727XXXXXX"
publish_time {
  seconds: 1573833888
  nanos: 57000000
}
}, timestamp=1573845386667}]
    at org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter.consumeMessage(PubSubInboundChannelAdapter.java:157)
    at org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberTemplate.lambda$subscribeAndConvert$1(PubSubSubscriberTemplate.java:152)
    at com.google.cloud.pubsub.v1.MessageDispatcher$4.run(MessageDispatcher.java:438)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.messaging.MessageHandlingException: nested exception is java.lang.IllegalArgumentException

不包括所有堆栈跟踪,其他重要行:

Caused by: java.lang.IllegalArgumentException: null

Config

@Configuration
public class GoogleCloud {

    private final GoogleProperties googleProperties;

    public GoogleCloud(final GoogleProperties googleProperties) {
        this.googleProperties = googleProperties;
    }

    @Bean
    @ServiceActivator(inputChannel = "pubsubOutputChannel")
    public MessageHandler messageSender(final PubSubOperations pubSubTemplate) {
        return new PubSubMessageHandler(pubSubTemplate, "doesNotMatter");
    }

    @Bean
    public PubSubInboundChannelAdapter scheduleEventsChannelAdapter(@Qualifier("schedulerChannel") final MessageChannel outputChannel,
                                                                    final PubSubOperations pubSubTemplate) {
        final PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, googleProperties.getExecuteEvent());
        adapter.setOutputChannel(outputChannel);
        adapter.setAckMode(AckMode.MANUAL);
        return adapter;
    }

    @Bean
    public Storage customStorage() {
        return StorageOptions.getDefaultInstance().getService();
    }
}

Listener

 @ServiceActivator(inputChannel = "schedulerChannel")
    public void handleMessage(final Message<String> message) {
        final ScheduledEvent scheduledEvent;
        try {
            scheduledEvent = objectMapper.readValue(message.getPayload(), ScheduledEvent.class);
        } catch (IOException e) {
            LOGGER.error("Error parsing message from queue");
            return;
        }

        BasicAcknowledgeablePubsubMessage consumer = (BasicAcknowledgeablePubsubMessage) message.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE);
        if (isNull(consumer)) {
            LOGGER.error("Failed to get message headers");
            return;
        }

        var eventName = scheduledEvent.getEventName();
        var accountId = scheduledEvent.getAccountId();
        myAwesomeService.doStuff(acountId,eventName);

        consumer.ack();
        queuePublisher.send(googleProperties.getSchedulerAcknowledge(),
                queuePublisher.convertObjectToString(new AcknowledgeEvent(scheduledEvent.getEventId())));
    }

QueuePublisher

@Service
@Transactional
public class QueuePublisher implements PubSubOutBoundGateway {

    private static final Logger LOGGER = LoggerFactory.getLogger(QueuePublisher.class);

    private final ObjectMapper objectMapper;
    private final PubSubOutBoundGateway pubSubOutBoundGateway;

    public QueuePublisher(final ObjectMapper objectMapper,
                          final PubSubOutBoundGateway pubSubOutBoundGateway) {
        this.objectMapper = objectMapper;
        this.pubSubOutBoundGateway = pubSubOutBoundGateway;
    }

    @Override
    public void send(final String topic, final String message) {
        pubSubOutBoundGateway.send(topic, message);
    }

    public String convertObjectToString(final Object object) {
        try {
            return objectMapper.writeValueAsString(object);
        } catch (IOException e) {
            LOGGER.error("Error converting object to string:{}", e);
            throw new BadRequestException();
        }
    }
}

PubSubOutBoundGateway

@MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
public interface PubSubOutBoundGateway {

    void send(@Header(GcpPubSubHeaders.TOPIC) String topic, String message);
}
java spring spring-boot spring-cloud publish-subscribe
1个回答
0
投票

在SpringBootApplication类中包含此注释。

@EnableIntegration
© www.soinside.com 2019 - 2024. All rights reserved.