如何消费来自pubsub的多条消息?这似乎是一个简单的问题,应该有简单的解决方案,但目前我可以找到简单的方法来使用
spring-cloud-gcp-pubsub
. 从 pubsub 消费批量记录
我正在使用
spring-cloud-gcp-pubsub
来消费来自 pubsub 的消息并在 spring boot 应用程序中处理它们。我当前的设置非常简单,我有 PubSubInboundChannelAdapter
和 ServiceActivator
消耗记录。经过研究,我发现了 spring integration Aggregators
但它们似乎不是这样做的好方法,因为向下游传播确认并不容易。有什么我想念的吗?如何批量消费消息?
PubSubInboundChannelAdapter
是基于订阅主题。所以,这将是一个消息流,这个 PubSubInboundChannelAdapter
对它们中的每一个做出反应,转换为 Spring 消息并将其发送到下游到配置的通道。
订阅的时候真的没办法批量获取消息
您还需要记住,在 GCP Pub/Sub 中没有像
offset
这样的东西。您确实应该确认您从 Pub/Sub 使用的每条消息。
虽然有使用
PubSubMessageSource
一次拉取一批消息的方法。 messageSource.setMaxFetchSize(5);
可以解决问题,但是这个 PubSubMessageSource
仍然会单独生成每条消息,因此您可以独立地(n)确认它们。
当然,您可以利用
PubSubMessageSource
使用的功能 - PubSubSubscriberOperations.pullAndConvert()
。有关详细信息,请参阅它的 JavaDocs:
/**
* Pull a number of messages from a Google Cloud Pub/Sub subscription and convert them to Spring messages with
* the desired payload type.
* @param subscription the subscription name
* @param maxMessages the maximum number of pulled messages
* @param returnImmediately returns immediately even if subscription doesn't contain enough
* messages to satisfy {@code maxMessages}
* @param payloadType the type to which the payload of the Pub/Sub messages should be converted
* @param <T> the type of the payload
* @return the list of received acknowledgeable messages
* @since 1.1
*/
<T> List<ConvertedAcknowledgeablePubsubMessage<T>> pullAndConvert(String subscription, Integer maxMessages,
Boolean returnImmediately, Class<T> payloadType);
所以,这个看起来像你正在寻找的,因为你确实会有一个消息列表,每个消息都是一个带有 (n)ack 回调的包装器。
此 API 可用于自定义
@InboundChannelAdapter
MessageSource
或 Supplier
@Bean
实现。
但仍然:我看不到整个批处理的好处,因为每条消息都可以单独确认而不会影响所有其他消息。
尝试使用以下:
@Bean
@InboundChannelAdapter(channel = "pubsubInputChannel", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "3"))
public MessageSource<Object> pubsubAdapter(PubSubTemplate pubSubTemplate) {
PubSubMessageSource messageSource = new PubSubMessageSource(pubSubTemplate, "testSubscription");
messageSource.setAckMode(AckMode.MANUAL);
return messageSource;
}
maxMessagesPerPoll
属性确定将轮询多少条消息。
我相信我最近遇到了同样的问题。 Google spring pubsub 库未提供预期结果。即使使用 MessageSource,pubsub 消息也会分批提取,但库只允许一条一条地处理它们。 使用 pull api 允许我解决它。 https://cloud.google.com/pubsub/docs/pull#unary_pull_code_samples 这种方法的缺点是需要提供触发拉取的代码。 (调度程序)