Pubsublite 消息确认不起作用

问题描述 投票:0回答:2

我正在使用 Google pubsublite。具有单个分区和一些消息的小型虚拟主题。 Python 客户端库。使用回调执行标准

SubscriberCluent.subscribe
。回调将消息放入队列中。当msg从队列中取出来消费时,它的
ack
就会被调用。当我想停止时,我调用
 subscribe_future.cancel(); subscriber_future.result()
并丢弃队列中未使用的消息。

假设我知道该主题有 30 条消息。我在停止之前吃了 10 个。然后我在同一订阅中重新启动一个新的

SubscriberClient
并接收消息。我希望从第 11 条消息开始,但我从第一条消息开始。所以珍贵的订阅者已经确认了前 10 个,但就好像服务器没有收到确认一样。

我想也许ack需要一些时间才能到达服务器。所以我等了2分钟才开始第二次订阅。没有帮助。

然后你认为也许订阅者对象管理 ack 调用,我需要在取消之前“刷新”它们,但我发现了另一个关于此的信息。

我错过了什么?

这是代码。如果您有 pubsublite 帐户,则在填写凭据后即可执行该代码。代码显示了两个问题,一是这个问题的主题;另一个是在这里

询问的
# Using python 3.8

from __future__ import annotations

import logging
import pickle
import queue
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from typing import Union, Optional

from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsub_v1.types import BatchSettings
from google.cloud.pubsublite import AdminClient, PubSubMessage
from google.cloud.pubsublite import Reservation as GCPReservation
from google.cloud.pubsublite import Subscription as GCPSubscription
from google.cloud.pubsublite import Topic as GCPTopic
from google.cloud.pubsublite.cloudpubsub import (PublisherClient,
                                                 SubscriberClient)
from google.cloud.pubsublite.types import (BacklogLocation, CloudZone,
                                           LocationPath,
                                           ReservationPath, SubscriptionPath,
                                           TopicPath,
                                           )
from google.cloud.pubsublite.types import FlowControlSettings
from google.oauth2.service_account import Credentials


logging.getLogger('google.cloud').setLevel(logging.WARNING)

logger = logging.getLogger(__name__)

FORMAT = '[%(asctime)s.%(msecs)03d %(name)s]  %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S')


class Account:
    def __init__(self,
                 project_id: str,
                 region: str,
                 zone: str,
                 credentials: Credentials,
                 ):
        self.project_id = project_id
        self.region = region
        self.zone = CloudZone.parse(zone)
        self.credentials = credentials
        self.client = AdminClient(region=region, credentials=credentials)

    def location_path(self) -> LocationPath:
        return LocationPath(self.project_id, self.zone)

    def reservation_path(self, name: str) -> ReservationPath:
        return ReservationPath(self.project_id, self.region, name)

    def topic_path(self, name: str) -> TopicPath:
        return TopicPath(self.project_id, self.zone, name)

    def subscription_path(self, name: str) -> SubscriptionPath:
        return SubscriptionPath(self.project_id, self.zone, name)

    def create_reservation(self, name: str, *, capacity: int = 32) -> None:
        path = self.reservation_path(name)
        reservation = GCPReservation(name=str(path),
                                     throughput_capacity=capacity)
        self.client.create_reservation(reservation)
        # logger.info('reservation %s created', name)

    def create_topic(self,
                     name: str,
                     *,
                     partition_count: int = 1,
                     partition_size_gib: int = 30,
                     reservation_name: str = 'default') -> Topic:
        # A topic name can not be reused within one hour of deletion.
        top_path = self.topic_path(name)
        res_path = self.reservation_path(reservation_name)

        topic = GCPTopic(
            name=str(top_path),
            partition_config=GCPTopic.PartitionConfig(count=partition_count),
            retention_config=GCPTopic.RetentionConfig(
                per_partition_bytes=partition_size_gib * 1024 * 1024 * 1024),
            reservation_config=GCPTopic.ReservationConfig(
                throughput_reservation=str(res_path)))

        self.client.create_topic(topic)
        # logger.info('topic %s created', name)

        return Topic(name, self)

    def delete_topic(self, name: str) -> None:
        path = self.topic_path(name)
        self.client.delete_topic(path)
        # logger.info('topic %s deleted', name)

    def get_topic(self, name: str) -> Topic:
        return Topic(name, self)


class Topic:

    def __init__(self, name: str, account: Account):
        self.account = account
        self.name = name
        self._path = self.account.topic_path(name)

    def create_subscription(self,
                            name: str,
                            *,
                            pos: str = None) -> Subscription:
        path = self.account.subscription_path(name)

        if pos is None or pos == 'beginning':
            starting_offset = BacklogLocation.BEGINNING
        elif pos == 'end':
            starting_offset = BacklogLocation.END
        else:
            raise ValueError(
                'Argument start only accepts one of two values - "beginning" or "end"'
            )

        Conf = GCPSubscription.DeliveryConfig
        subscription = GCPSubscription(
            name=str(path),
            topic=str(self._path),
            delivery_config=Conf(delivery_requirement=Conf.DeliveryRequirement.DELIVER_IMMEDIATELY))

        self.account.client.create_subscription(subscription, starting_offset)
        # logger.info('subscription %s created for topic %s', name, self.name)

        return Subscription(name, self)

    def delete_subscription(self, name: str) -> None:
        path = self.account.subscription_path(name)
        self.account.client.delete_subscription(path)
        # logger.info('subscription %s deleted from topic %s', name, self.name)

    def get_subscription(self, name: str):
        return Subscription(name, self)

    @contextmanager
    def get_publisher(self, **kwargs):
        with Publisher(self, **kwargs) as pub:
            yield pub


class Publisher:
    def __init__(self, topic: Topic, *, batch_size: int = 100):
        self.topic = topic
        self._batch_config = {
                'max_bytes': 3 * 1024 * 1024,  # 3 Mb; must be < 4 max_bytes
                'max_latency': 0.05,  # 50 ms
                'max_messages': batch_size,  # default is 1000
                }
        self._messages = queue.Queue(1000)
        self._nomore = object()
        self._pool = None
        self._worker = None

    def __enter__(self):
        def _publish():
            path = self.topic._path
            dumps = pickle.dumps
            nomore = self._nomore
            q = self._messages
            with PublisherClient(
                    credentials=self.topic.account.credentials,
                    per_partition_batching_settings=BatchSettings(**self._batch_config),
                    ) as publisher:
                while True:
                    x = q.get()
                    if x is nomore:
                        break
                    future = publisher.publish(path, dumps(x))
                    future.result()

        self._pool = ThreadPoolExecutor(1)
        self._worker = self._pool.submit(_publish)
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self._messages.put(self._nomore)
        self._worker.result()
        self._pool.shutdown()

    def put(self, data) -> None:
        self._messages.put(data)


class Subscription:

    def __init__(self, name: str, topic: Topic):
        self.topic = topic
        self.name = name
        self._path = topic.account.subscription_path(name)

    @contextmanager
    def get_subscriber(self, *, backlog=None):
        with Subscriber(self, backlog=backlog) as sub:
            yield sub


class Subscriber:
    def __init__(self, subscription: Subscription, backlog: int = None):
        self.subscription = subscription
        self._backlog = backlog or 100
        self._cancel_requested: bool = None
        self._messages: queue.Queue = None
        self._pool: ThreadPoolExecutor = None
        self._NOMORE = object()
        self._subscribe_task = None

    def __enter__(self):
        self._pool = ThreadPoolExecutor(1).__enter__()
        self._messages = queue.Queue(self._backlog)
        messages = self._messages

        def callback(msg: PubSubMessage):
            logger.info('got %s', pickle.loads(msg.data))
            messages.put(msg)

        def _subscribe():
            flowcontrol = FlowControlSettings(
                    messages_outstanding=self._backlog,
                    bytes_outstanding=1024 * 1024 * 10)

            subscriber = SubscriberClient(credentials=self.subscription.topic.account.credentials)
            with subscriber:
                fut = subscriber.subscribe(self.subscription._path, callback, flowcontrol)
                logger.info('subscribe sent to gcp')

                while True:
                    if self._cancel_requested:
                        fut.cancel()
                        fut.result()
                        while True:
                            while not messages.empty():
                                try:
                                    _ = messages.get_nowait()
                                except queue.Empty:
                                    break
                            try:
                                messages.put_nowait(self._NOMORE)
                                break
                            except queue.Full:
                                continue
                        break
                    time.sleep(0.003)

        self._subscribe_task = self._pool.submit(_subscribe)
        return self

    def __exit__(self, *args, **kwargs):
        if self._pool is not None:
            if self._subscribe_task is not None:
                self._cancel_requested = True
                while True:
                    z = self._messages.get()
                    if z is self._NOMORE:
                        break
                self._subscribe_task.result()
                self._subscribe_task = None
                self._messages = None
            self._pool.__exit__(*args, **kwargs)
            self._pool = None

    def get(self, timeout=None):
        if timeout is not None and timeout == 0:
            msg = self._messages.get_nowait()
        else:
            msg = self._messages.get(block=True, timeout=timeout)
        data = pickle.loads(msg.data)
        msg.ack()
        return data


def get_account() -> Account:
    return Account(project_id='--fill-in-proj-id--',
                   region='us-central1',
                   zone='us-central1-a',
                   credentials='--fill-in-creds--')


# This test shows that it takes extremely long to get the first messsage
# in `subscribe`.
def test1(account):
    name = 'test-' + str(uuid.uuid4())
    topic = account.create_topic(name)
    try:
        with topic.get_publisher() as p:
            p.put(1)
            p.put(2)
            p.put(3)

        sub = topic.create_subscription(name)
        try:
            with sub.get_subscriber() as s:
                t0 = time.time()
                logger.info('getting the first message')
                z = s.get()
                t1 = time.time()
                logger.info('  got the first message')
                print(z)
            print('getting the first msg took', t1 - t0, 'seconds')
        finally:
            topic.delete_subscription(name)
    finally:
        account.delete_topic(name)


def test2(account):
    name = 'test-' + str(uuid.uuid4())
    topic = account.create_topic(name)
    N = 30
    try:
        with topic.get_publisher(batch_size=1) as p:
            for i in range(N):
                p.put(i)

        sub = topic.create_subscription(name)
        try:
            with sub.get_subscriber() as s:
                for i in range(10):
                    z = s.get()
                    assert z == i

            # The following block shows that the subscriber
            # resets to the first message, not as expected
            # that it picks up where the last block left.

            with sub.get_subscriber() as s:
                for i in range(10, 20):
                    z = s.get()
                    try:
                        assert z == i
                    except AssertionError as e:
                        print(z, '!=', i)
                        return
        finally:
            topic.delete_subscription(name)
    finally:
        account.delete_topic(name)


if __name__ == '__main__':
    a = get_account()
    try:
        a.create_reservation('default')
    except AlreadyExists:
        pass

    test1(a)
    print('')
    test2(a)
python message-queue google-cloud-pubsub subscribe google-cloud-pubsublite
2个回答
1
投票

我找到了解决办法。在取消“订阅”未来之前,我需要睡一会儿以允许刷新(即发送出去)。特别是,

google.cloud.pubsublite.cloudpubsub.internal.make_subscriber._DEFAULT_FLUSH_SECONDS
(值 0.1)似乎是观看时间。需要睡得比这个长一点才能确定。

这是 google 包中的一个错误。 “取消”未来意味着放弃未处理的消息,而应发送已提交的确认。这个错误可能没有被注意到,因为重复的消息传递不是一个错误。


0
投票

我无法重现您的问题,但我认为您应该检查有关使用 cloud pubsublite 的官方文档中处理该问题的方式。

这是我从接收消息示例中提取和更新的代码,它按预期工作,它将从精简主题获取消息并确认以避免再次获取。如果重新运行,我只会在有数据要提取的情况下获取数据。我添加了代码,以便您可以检查是否有与您的代码不同的内容。

consumer.py

from concurrent.futures._base import TimeoutError
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    FlowControlSettings,
    SubscriptionPath,
    MessageMetadata, 
)
from google.cloud.pubsub_v1.types import PubsubMessage

# TODO(developer):
project_number = project-number
cloud_region = "us-central1"
zone_id = "a"
subscription_id = "sub-id"
timeout = 90

location = CloudZone(CloudRegion(cloud_region), zone_id)
subscription_path = SubscriptionPath(project_number, location, subscription_id)

per_partition_flow_control_settings = FlowControlSettings(
    messages_outstanding=1000,
    bytes_outstanding=10 * 1024 * 1024,
)

def callback(message: PubsubMessage):
    message_data = message.data.decode("utf-8")
    metadata = MessageMetadata.decode(message.message_id)
    print(f"Received {message_data} of ordering key {message.ordering_key} with id {metadata}.")
    message.ack()

# SubscriberClient() must be used in a `with` block or have __enter__() called before use.
with SubscriberClient() as subscriber_client:

    streaming_pull_future = subscriber_client.subscribe(
        subscription_path,
        callback=callback,
        per_partition_flow_control_settings=per_partition_flow_control_settings,
    )

    print(f"Listening for messages on {str(subscription_path)}...")

    try:
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError or KeyboardInterrupt:
        streaming_pull_future.cancel()
        assert streaming_pull_future.done()

我遇到你的情况的唯一方法是当我使用不同的订阅时。但在这方面,当不同的订阅从主题获取消息时,每个订阅都将收到相同的存储消息,如从 Lite 订阅接收消息中所述。

考虑一下:

© www.soinside.com 2019 - 2024. All rights reserved.