如何顺序处理Google Pub/Sub消息?

问题描述 投票:0回答:2

我正在尝试他们提供的 Google pub/sub 示例代码。类似于以下内容:

sub.js
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();
// Set the max number of messages
const maxMessages = 1;

function listenForMessages(subscriptionNameOrId) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    setTimeout(() => { // test code to hold the current message processing time, but it's not working.
      message.ack();
      console.log("ACK after set timeout")
    }, 60000)
 
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);
  subscription.open({ maxMessages });
}

(async() => {
    listenForMessages('projects/my-project/subscriptions/faz-sub')
})()

和 pub.js

const topicNameOrId = 'projects/my-project/topics/faz-topic';
const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishMessage(topicNameOrId, data) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  try {
    const messageId = await pubSubClient
      .topic(topicNameOrId)
      .publishMessage({data: dataBuffer});
    console.log(`Message ${messageId} published.`);
  } catch (error) {
    console.error(`Received error while publishing: ${error.message}`);

    process.exitCode = 1;
  }
}


(async() => {
     for(let i = 0;i <= 10;i++){
    await publishMessage(topicNameOrId, JSON.stringify({foo: 'bar_'+ i}))
     }

 })()

当我启动上述

sub.js
时,它会在处理当前消息之前处理许多消息。我以为直到我
message.ack();
当前消息,下一条消息都不会处理,但无论我
message.ack();
当前消息与否,我从 pub.js 发送的所有消息都在尝试处理。

基本上我想要的是直到我完成处理并且

ack()
(确认)当前消息我的订阅不应处理下一条消息。换句话说,我想顺序处理订阅消息。

我什至尝试过使用

subscription.open({ maxMessages });
但 sub.js 仍然接收所有消息。

因此,我们将非常感谢这里的任何帮助。

node.js google-cloud-platform message-queue publish-subscribe
2个回答
0
投票

向订阅者添加以下配置使我能够顺序访问消息,

flowControl: {
      maxMessages: maxInProgress,
      allowExcessMessages: false
    }

这是完整的订阅者代码:

  const subscriptionNameOrId = 'subscriber';
    const maxInProgress = 1;
    
    // Imports the Google Cloud client library
    const {PubSub} = require('@google-cloud/pubsub');
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    async function subscribeWithFlowControlSettings(
      subscriptionNameOrId,
      maxInProgress
    ) {
      const subscriberOptions = {
        flowControl: {
          maxMessages: maxInProgress,
          allowExcessMessages: false
        },
      };
    
      // References an existing subscription.
      // Note that flow control settings are not persistent across subscribers.
      const subscription = pubSubClient.subscription(
        subscriptionNameOrId,
subscriberOptions
      );
    
      console.log(
        `Subscriber to subscription ${subscription.name} is ready to receive messages at a controlled volume of ${maxInProgress} messages.`
      );
    
      const messageHandler = async message => {
        console.log(`Received message: ${message.id}`);
        await new Promise(resolve => setTimeout(resolve, 1000));
        console.log("Done")
        message.ack();
      };
    
      subscription.on('message', messageHandler);
    }
    
    (async() => {
       await subscribeWithFlowControlSettings(subscriptionNameOrId, maxInProgress)
    })()

-1
投票

此问题有任何更新或解决方法吗?

© www.soinside.com 2019 - 2024. All rights reserved.