使用 Spring 消费来自 pubsub 的消息批次

问题描述 投票:0回答:3

如何消费来自pubsub的多条消息?这似乎是一个简单的问题,应该有简单的解决方案,但目前我可以找到简单的方法来使用

spring-cloud-gcp-pubsub
.

从 pubsub 消费批量记录

我正在使用

spring-cloud-gcp-pubsub
来消费来自 pubsub 的消息并在 spring boot 应用程序中处理它们。我当前的设置非常简单,我有
PubSubInboundChannelAdapter
ServiceActivator
消耗记录。经过研究,我发现了 spring integration
Aggregators
但它们似乎不是这样做的好方法,因为向下游传播确认并不容易。有什么我想念的吗?如何批量消费消息?

spring-integration spring-cloud google-cloud-pubsub
3个回答
0
投票

PubSubInboundChannelAdapter
是基于订阅主题。所以,这将是一个消息流,这个
PubSubInboundChannelAdapter
对它们中的每一个做出反应,转换为 Spring 消息并将其发送到下游到配置的通道。 订阅的时候真的没办法批量获取消息

您还需要记住,在 GCP Pub/Sub 中没有像

offset
这样的东西。您确实应该确认您从 Pub/Sub 使用的每条消息。

虽然有使用

PubSubMessageSource
一次拉取一批消息的方法。
messageSource.setMaxFetchSize(5);
可以解决问题,但是这个
PubSubMessageSource
仍然会单独生成每条消息,因此您可以独立地(n)确认它们。

当然,您可以利用

PubSubMessageSource
使用的功能 -
PubSubSubscriberOperations.pullAndConvert()
。有关详细信息,请参阅它的 JavaDocs:

/**
 * Pull a number of messages from a Google Cloud Pub/Sub subscription and convert them to Spring messages with
 * the desired payload type.
 * @param subscription the subscription name
 * @param maxMessages the maximum number of pulled messages
 * @param returnImmediately returns immediately even if subscription doesn't contain enough
 * messages to satisfy {@code maxMessages}
 * @param payloadType the type to which the payload of the Pub/Sub messages should be converted
 * @param <T> the type of the payload
 * @return the list of received acknowledgeable messages
 * @since 1.1
 */
<T> List<ConvertedAcknowledgeablePubsubMessage<T>> pullAndConvert(String subscription, Integer maxMessages,
        Boolean returnImmediately, Class<T> payloadType);

所以,这个看起来像你正在寻找的,因为你确实会有一个消息列表,每个消息都是一个带有 (n)ack 回调的包装器。

此 API 可用于自定义

@InboundChannelAdapter
MessageSource
Supplier
@Bean
实现。

但仍然:我看不到整个批处理的好处,因为每条消息都可以单独确认而不会影响所有其他消息。


0
投票

尝试使用以下:

@Bean
    @InboundChannelAdapter(channel = "pubsubInputChannel", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "3"))
    public MessageSource<Object> pubsubAdapter(PubSubTemplate pubSubTemplate) {
        PubSubMessageSource messageSource = new PubSubMessageSource(pubSubTemplate, "testSubscription");
        messageSource.setAckMode(AckMode.MANUAL);
        return messageSource;
    }

maxMessagesPerPoll
属性确定将轮询多少条消息。


0
投票

我相信我最近遇到了同样的问题。 Google spring pubsub 库未提供预期结果。即使使用 MessageSource,pubsub 消息也会分批提取,但库只允许一条一条地处理它们。 使用 pull api 允许我解决它。 https://cloud.google.com/pubsub/docs/pull#unary_pull_code_samples 这种方法的缺点是需要提供触发拉取的代码。 (调度程序)

© www.soinside.com 2019 - 2024. All rights reserved.