拦截调用以刷新令牌GRPC

问题描述 投票:0回答:3

我在项目中使用带有 proto 的 GRPC,并且我有 KEY 和 AUTHORITY 令牌来访问服务器 API。 所以,我需要使用我的权限更新密钥。 我正在像这样建立频道:

OkHttpChannelBuilder.forAddress(host, port)
        .usePlaintext()
        .intercept(auth, logger)
        .build()

我的拦截器看起来像:

class AuthClientInterceptor(
    private val prefs: Preferences,
    private val keyApi: KeyApi) : ClientInterceptor {

    companion object {
        private const val ACCESS_TOKEN = "authorization"
    }

    override fun <ReqT : Any?, RespT : Any?> interceptCall(method: MethodDescriptor<ReqT, RespT>?,
                                                       callOptions: CallOptions?,
                                                       next: Channel): ClientCall<ReqT, RespT> {

        val call = next.newCall(method, callOptions)

        val callForwarding = object : ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT>(call) {
            override fun checkedStart(responseListener: Listener<RespT>?, headers: Metadata) {

            synchronized(this@AuthClientInterceptor) {
                val keyCreated = prefs.getAccessKeyCreated()
                val keyExpires = prefs.getAccessKeyExpires()
                val currentTime = System.currentTimeMillis()
                if (currentTime < keyCreated || currentTime > keyExpires) {
                    keyApi.issueNewKey(prefs.getAuthority())
                        .map { it.data }
                        .doOnSuccess { prefs.setAccessKey(it.token) }
                        .doOnSuccess { prefs.setAccessKeyCreated(it.createdDate) }
                        .doOnSuccess { prefs.setAccessKeyExpires(it.expiresDate) }
                        .blockingGet()
                }
            }

            val keyData = Metadata.Key.of(ACCESS_TOKEN, Metadata.ASCII_STRING_MARSHALLER)
                if (headers[keyData] == null) {
                    headers.put(keyData, "Bearer ${prefs.getAccessKey()}")
                }
                call.start(responseListener, headers)
            }
        }
        return callForwarding
    }
}

如您所见,我只是检查当前时间并将其与令牌创建日期和到期日期进行比较。

所以,我不喜欢这样。我想实现这个:

1)向服务器发送请求;

2) 检查响应。如果这意味着我的 KEY 已过期,请同步刷新密钥并重复请求(如身份验证器)。

但我没有找到解决方案或任何有关使用 gRPC 实现此功能的有用信息。有人可以帮助我吗?

android kotlin interceptor grpc
3个回答
0
投票

这是您可以使用的完整客户端拦截器类。

class Interceptor() : ClientInterceptor {

    override fun <ReqT : Any?, RespT : Any?> interceptCall(method: MethodDescriptor<ReqT, RespT>?, callOptions: CallOptions?, next: Channel?): ClientCall<ReqT, RespT> {

        return object : ClientCall<ReqT, RespT>() {

            var listener: Listener<RespT>? = null
            var metadata: Metadata? = null
            var message: ReqT? = null
            var request = 0
            var call: ClientCall<ReqT, RespT>? = null

            override fun start(responseListener: Listener<RespT>?, headers: Metadata?) {
                this.listener = responseListener
                this.metadata = headers
            }

            override fun sendMessage(message: ReqT) {
                assert(this.message == null)
                this.message = message
            }

            override fun request(numMessages: Int) {
                request += numMessages
                assert(this.message == null)
            }

            override fun isReady(): Boolean {
                return false
            }

            override fun halfClose() {

                startCall(object : ForwardingClientCallListener<RespT>() {

                    var delegate: Listener<RespT>? = null

                    override fun onReady() {
                        delegate = listener
                        super.onReady()
                    }

                    override fun delegate(): Listener<RespT> {
                        if (delegate == null) {
                            throw IllegalStateException()
                        }
                        return delegate!!
                    }

                    override fun onClose(status: Status?, trailers: Metadata?) {
                        if (delegate == null) {
                            super.onClose(status, trailers)
                            return
                        }
                        if (!needToRetry(status, trailers)) {
                            delegate = listener
                            super.onClose(status, trailers)
                            return
                        }
                        startCall(listener) // Only retry once
                    }

                    private fun needToRetry(status: Status?, trailers: Metadata?): Boolean {
                        if (status?.code?.toStatus() == UNAUTHENTICATED) {
                            Log.e("code", status?.code.toString())
                            return true
                        }
                        return false
                    }
                })
            }

            private fun startCall(listener: Listener<RespT>?) {
                call = next?.newCall(method, callOptions)
                val headers = Metadata()
                headers.merge(metadata)
                call?.start(listener, headers)
                assert(this.message != null)
                call?.request(request)
                call?.sendMessage(message)
                call?.halfClose()
            }

            override fun cancel(message: String?, cause: Throwable?) {
                if (call != null) {
                    call?.cancel(message, cause)
                }
                listener?.onClose(Status.CANCELLED.withDescription(message).withCause(cause), Metadata())
            }
        }
    }
}

它缓冲消息并重试,您可以在

needToRetry(status, trailers)

中添加您的逻辑

欲了解更多信息,您可以访问此 GitHub 链接。


0
投票

这有效。我正在使用 Koin 进行依赖注入和数据存储。

class Interceptor(private val repo: RefreshRepo, private val store: DataStore<AuthResponse>) : ClientInterceptor {
    override fun <ReqT : Any?, RespT : Any?> interceptCall(method: MethodDescriptor<ReqT, RespT>?, callOptions: CallOptions?, next: Channel?): ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT> {
        val callOptionsWithDeadline = callOptions?.withDeadlineAfter(4, TimeUnit.MINUTES)
        return object : ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next?.newCall(method, callOptionsWithDeadline)) {
            var call: ClientCall<ReqT, RespT>? = null
            var listener: Listener<RespT>? = null
            var delegate: Listener<RespT>? = null
            var metadata: Metadata? = null
            var message: ReqT? = null
            var request = 0

            override fun sendMessage(message: ReqT) {
                super.sendMessage(message)
                this.message = message
            }

            override fun request(numMessages: Int) {
                super.request(numMessages)
                request += numMessages
            }

            override fun cancel(message: String?, cause: Throwable?) {
                super.cancel(message, cause)
                if (call != null) call?.cancel(message, cause)
                listener?.onClose(Status.CANCELLED.withDescription(message).withCause(cause), Metadata())
            }

            override fun start(responseListener: Listener<RespT>?, headers: Metadata?) {
                listener = responseListener
                metadata = headers

                val accessToken = runBlocking {
                    store.data.first().accessToken // get the accessToken from DataStore
                }

                metadata?.put(key, accessToken)

                super.start(object : ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {

                    override fun onMessage(message: RespT) {
                        responseListener?.onMessage(message)
                    }

                    override fun delegate(): Listener<RespT> {
                        delegate = responseListener
                        return delegate!!
                    }

                    override fun onClose(status: Status?, trailers: Metadata?) {
                        if (delegate == null || status?.code?.toStatus() != Status.UNAUTHENTICATED) {
                            super.onClose(status, trailers)
                        } else {
                            delegate = null
                            try {
                                val res = runBlocking {
                                    // refresh and store the accessToken to DataStore
                                    val r = repo.refresh()
                                    val auth = store.data.first().toBuilder()
                                        .setAccessToken(r.accessToken)
                                        .setRefreshToken(r.refreshToken)
                                        .build()
                                    store.updateData { auth }

                                    r
                                }

                                call = next?.newCall(method, callOptionsWithDeadline)
                                val md = Metadata()
                                md.put(key, res.accessToken)
                                md.merge(metadata)
                                call?.start(listener, md)
                                call?.request(request)
                                call?.sendMessage(message)
                                call?.halfClose()
                            } catch (e: Exception) {
                                super.onClose(status, trailers)
                            }
                        }
                    }
                }, headers)
            }
        }
    }

    private val key: Metadata.Key<String>
        get() = Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER)
}

-2
投票

如果您想这样做,我认为您必须完全按照您所描述的那样在应用程序级别处理它。这是因为 gRPC 不知道您的应用程序级别令牌。

  1. 进行 RPC
  2. 请注意,RPC 由于令牌过期而失败
  3. 调用刷新令牌的代码
  4. 重复

您所说的验证器是什么?

© www.soinside.com 2019 - 2024. All rights reserved.