Google pubsub:如何为 pubsub 订阅者实现自定义健康活性

问题描述 投票:0回答:1

更新2024-04-17

这可能是关闭订阅者的代码

if (!StatusUtil.isRetryable(cause)) {
  ApiException gaxException =
      ApiExceptionFactory.createException(
          cause, GrpcStatusCode.of(Status.fromThrowable(cause).getCode()), false);
  logger.log(Level.SEVERE, "terminated streaming with exception", gaxException);
  runShutdown();
  setFailureFutureOutstandingMessages(cause);
  notifyFailed(gaxException);
  return;
}

这是在

StreamingPullResponseObserver
内部,它是包本地类
StreamingSubscriberConnection
中的私有类。


在 2024-04-15 20:04:54 UTC,发生了一个事件,在日志中给出了此信息

com.google.api.gax.rpc.PermissionDeniedException: com.google.api.gax.rpc.PermissionDeniedException: io.grpc.StatusRuntimeException: PERMISSION_DENIED: The caller does not have permission

具有相同内容的新日志条目大约。 5分钟后。然后,2 个不同 GKE 集群中的所有 12 个 Micronaut 应用程序都停止监听 pubsub。我等待着大约的情况。重新启动所有应用程序前 1 小时。重新启动后,系统恢复正常运行。我们合作的组织也发生过类似事件。

我相信我们需要将 pubsub 订阅者健康状况添加到自定义活动端点,以便在这种情况下重新启动应用程序。我正在研究 pubsub 异常处理,但它似乎只解决消息接收错误。我目前正在研究全局应用程序异常处理程序。

系统已投入生产 12 个多月,我们之前从未遇到过此错误。

分享任何检查 pubsub 监听器健康状况的经验,我们将不胜感激。

环境/版本

  • Micronaut 平台 4.3.8
  • micronaut-gcp-pubsub:5.5.0
  • 谷歌云-pubsub:1.128.1
  • JDK 17(Kotlin)
kotlin google-cloud-pubsub micronaut
1个回答
0
投票

对 google-cloud-pubsub 代码的调查没有发现任何用于检查订阅者状态的漏洞。

我最终得到的解决方案是添加一个应用程序正在向其发布时间戳的活跃主题。每个应用程序都有自己的活跃主题订阅,包括发布活跃消息的应用程序。

以下代码假设每个应用程序最多有 2 个 pod。当每个应用程序有更多 pod 时,需要比下面的代码更频繁地发布活动消息。

代码

在 application.yml 中,用于发布活跃消息的应用程序。其他应用程序只有

liveness-subscription

gcp:
  liveness-subscription: liveness-my-app-1
  liveness-topic: liveness-queue

pubsub-liveness:
  crontab: "*/15 * * * *"

活跃消息

/**
 * A message that is sent to the liveness topic
 *
 * @property masterApplication The name of the application that published the message.
 * @property timestampInMillis The timestamp of the message in milliseconds since epoch.
 */
class LivenessMessage(
    val masterApplication: String,
    val timestampInMillis: Long
)

主应用程序使用的发布者

import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import no.mycompany.pubsub.model.LivenessMessage

/**
 * Pubsub publisher for sending liveness message.
 */
@PubSubClient
fun interface LivenessPublisher {

    /**
     * Publishes a liveness message on pubsub.
     *
     * @param livenessMessage Payload
     * @return message id
     */
    @Topic(value = "\${gcp.liveness-topic}")
    fun sendLivenessMessage(livenessMessage: LivenessMessage): String
}

每 X 分钟运行一次的发布服务

import io.micronaut.scheduling.annotation.Scheduled
import jakarta.inject.Singleton
import no.mycompany.pubsub.LivenessPublisher
import no.mycompany.pubsub.model.LivenessMessage
import java.time.Instant

@Singleton
class LivenessPublishSvc(
    private val livenessPublisher: LivenessPublisher
) {
    @Scheduled(cron = "\${pubsub-liveness.crontab}")
    fun publishLiveness() {
        val livenessMessage = LivenessMessage(
            masterApplication = MASTER_APPLICATION,
            timestampInMillis = Instant.now().toEpochMilli()
        )

        livenessPublisher.sendLivenessMessage(livenessMessage)
    }

    companion object {
        const val MASTER_APPLICATION = "my-master-application"
    }
}

所有订阅者都继承的抽象监听器

import java.time.Instant
import java.util.concurrent.atomic.AtomicLong

abstract class AbstractLivenessListener {
    val mostRecentReceivedTimeInMillis = AtomicLong(Instant.now().toEpochMilli())
}

订阅者

import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import no.mycompany.pubsub.liveness.AbstractLivenessListener
import no.mycompany.pubsub.model.LivenessMessage


@PubSubListener
class LivenessListener : AbstractLivenessListener() {
    @Subscription("\${gcp.liveness-subscription}")
    fun onLivenessMessageReceived(livenessMessage: LivenessMessage) {
        mostRecentReceivedTimeInMillis.set(livenessMessage.timestampInMillis)
    }
}

实用类

import java.time.Duration
import java.time.Instant

object PubsubLivenessUtils {
    /**
     * Checks if the liveness listener is healthy by comparing the most recent received time
     * with the current time. If the difference is less than the maximum duration, the listener
     * is considered healthy.
     *
     * @param mostRecentReceivedTimeInMillis The most recent time the listener received a message
     * @return true if the listener is healthy, false otherwise
     */
    fun isLivenessListenerHealthy(
        mostRecentReceivedTimeInMillis: Long
    ): Boolean = Duration.between(
        Instant.ofEpochMilli(mostRecentReceivedTimeInMillis),
        Instant.now()
    ) < maxDuration

    private const val MAX_LIVENESS_AGE_IN_MINUTES = 31L
    private val maxDuration = Duration.ofMinutes(MAX_LIVENESS_AGE_IN_MINUTES)
}

最后是自定义健康指示器

import io.micronaut.health.HealthStatus
import io.micronaut.management.health.indicator.HealthIndicator
import io.micronaut.management.health.indicator.HealthResult
import io.micronaut.management.health.indicator.annotation.Liveness
import no.mycompany.pubsub.LivenessListener
import no.mycompany.pubsub.liveness.PubsubLivenessUtils.isLivenessListenerHealthy
import org.reactivestreams.Publisher
import reactor.core.publisher.Mono

/**
 * Custom liveness indicator.
 *
 * @property livenessListener [LivenessListener]
 */
@Liveness
class LivenessIndicator(
    private val livenessListener: LivenessListener
) : HealthIndicator {

    /**
     * Gets health state.
     *
     * @return [HealthResult] with [HealthStatus.UP] or [HealthStatus.DOWN]
     */
    override fun getResult(): Publisher<HealthResult> = Mono.fromCallable { buildHealthResult() }

    private fun buildHealthResult(): HealthResult {
        val healthStatus = if (isLivenessListenerHealthy(livenessListener.mostRecentReceivedTimeInMillis.get())
        ) HealthStatus.UP else HealthStatus.DOWN

        return HealthResult.builder(LIVENESS_NAME).status(healthStatus).build()
    }

    companion object {
        const val LIVENESS_NAME = "liveness"
    }
}

和测试

import io.kotest.core.spec.style.BehaviorSpec
import io.kotest.matchers.shouldBe
import io.micronaut.health.HealthStatus
import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
import no.mycompany.pubsub.LivenessListener
import reactor.test.StepVerifier
import java.time.Instant

class LivenessIndicatorTest : BehaviorSpec({

    Given("LivenessIndicator with mocked LivenessListener") {
        lateinit var livenessListener: LivenessListener
        lateinit var sut: LivenessIndicator

        beforeContainer {
            livenessListener = mockk(relaxed = true)
            sut = LivenessIndicator(livenessListener = livenessListener)
            
            every {
                livenessListener.mostRecentReceivedTimeInMillis.get()
            } answers { Instant.now().toEpochMilli() - 30 * 60 * 1000 }
        }

        When("LivenessListener is healthy") {
            val resultMono = sut.result

            StepVerifier.create(resultMono)
                .consumeNextWith { it.status shouldBe HealthStatus.UP }
                .verifyComplete()

            verify { livenessListener.mostRecentReceivedTimeInMillis.get() }
        }

        When("LivenessListener is unhealthy") {
            every {
                livenessListener.mostRecentReceivedTimeInMillis.get()
            } answers { Instant.now().toEpochMilli() - 33 * 60 * 1000 }

            val resultMono = sut.result

            StepVerifier.create(resultMono)
                .consumeNextWith { it.status shouldBe HealthStatus.DOWN }
                .verifyComplete()

            verify { livenessListener.mostRecentReceivedTimeInMillis.get() }
        }
    }
})
© www.soinside.com 2019 - 2024. All rights reserved.