我正在使用Spring Boot与pubsub主题进行交互。
此连接的我的配置类如下:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.core.publisher.PubSubPublisherTemplate;
import org.springframework.cloud.gcp.pubsub.support.PublisherFactory;
import org.springframework.cloud.gcp.pubsub.support.converter.SimplePubSubMessageConverter;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.pubsub.v1.PubsubMessage;
public abstract class PubSubPublisher {
private static final Logger LOGGER = LoggerFactory.getLogger(PubSubPublisher.class);
private final PubSubTemplate pubSubTemplate;
protected PubSubPublisher(PubSubTemplate pubSubTemplate) {
this.pubSubTemplate = pubSubTemplate;
}
protected abstract String topic(String topicName);
public ListenableFuture<String> publish(String topicName, String message) {
LOGGER.info("Publishing to topic [{}]. Message: [{}]", topicName, message);
return pubSubTemplate.publish(topicName, message);
}
}
我正在为我服务,就像这样:
publisher.publish(topic-name, payload);
此发布方法是异步方法,它总是传递而没有等待确认。我在发布后进行添加获取,等待它获得pubsub的响应。
但是我想知道如果我的主题还不存在并且我尝试推送一些消息,它应该抛出一些错误,例如找不到资源,仅考虑使用默认的异步方法。
可能实现回调会有所帮助,但我无法在我的代码中做到这一点。并且当前使用回调的重写发布方法只是抛出WARN not exception,我希望将其作为例外。这就是我想要实现回调的原因。
您可以检查主题是否已经存在
from google.cloud import pubsub_v1
project_id = "projectname"
topic_name = "unknowTopic"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
try:
response = publisher.get_topic(topic_path)
except Exception as e:
print(e)
此错误返回为
404 Resource not found (resource=unknowTopic).