我正在尝试他们提供的 Google pub/sub 示例代码。类似于以下内容:
sub.js
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
// Set the max number of messages
const maxMessages = 1;
function listenForMessages(subscriptionNameOrId) {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionNameOrId);
// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = message => {
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
messageCount += 1;
// "Ack" (acknowledge receipt of) the message
setTimeout(() => { // test code to hold the current message processing time, but it's not working.
message.ack();
console.log("ACK after set timeout")
}, 60000)
};
// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);
subscription.open({ maxMessages });
}
(async() => {
listenForMessages('projects/my-project/subscriptions/faz-sub')
})()
和 pub.js
const topicNameOrId = 'projects/my-project/topics/faz-topic';
const data = JSON.stringify({foo: 'bar'});
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishMessage(topicNameOrId, data) {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
try {
const messageId = await pubSubClient
.topic(topicNameOrId)
.publishMessage({data: dataBuffer});
console.log(`Message ${messageId} published.`);
} catch (error) {
console.error(`Received error while publishing: ${error.message}`);
process.exitCode = 1;
}
}
(async() => {
for(let i = 0;i <= 10;i++){
await publishMessage(topicNameOrId, JSON.stringify({foo: 'bar_'+ i}))
}
})()
当我启动上述
sub.js
时,它会在处理当前消息之前处理许多消息。我以为直到我 message.ack();
当前消息,下一条消息都不会处理,但无论我 message.ack();
当前消息与否,我从 pub.js 发送的所有消息都在尝试处理。
基本上我想要的是直到我完成处理并且
ack()
(确认)当前消息我的订阅不应处理下一条消息。换句话说,我想顺序处理订阅消息。
我什至尝试过使用
subscription.open({ maxMessages });
但 sub.js 仍然接收所有消息。
因此,我们将非常感谢这里的任何帮助。
向订阅者添加以下配置使我能够顺序访问消息,
flowControl: {
maxMessages: maxInProgress,
allowExcessMessages: false
}
这是完整的订阅者代码:
const subscriptionNameOrId = 'subscriber';
const maxInProgress = 1;
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function subscribeWithFlowControlSettings(
subscriptionNameOrId,
maxInProgress
) {
const subscriberOptions = {
flowControl: {
maxMessages: maxInProgress,
allowExcessMessages: false
},
};
// References an existing subscription.
// Note that flow control settings are not persistent across subscribers.
const subscription = pubSubClient.subscription(
subscriptionNameOrId,
subscriberOptions
);
console.log(
`Subscriber to subscription ${subscription.name} is ready to receive messages at a controlled volume of ${maxInProgress} messages.`
);
const messageHandler = async message => {
console.log(`Received message: ${message.id}`);
await new Promise(resolve => setTimeout(resolve, 1000));
console.log("Done")
message.ack();
};
subscription.on('message', messageHandler);
}
(async() => {
await subscribeWithFlowControlSettings(subscriptionNameOrId, maxInProgress)
})()
此问题有任何更新或解决方法吗?