如何刷新Apache flink中的redis缓存?

问题描述 投票:0回答:1

我有一个需求是从flink中的redis缓存中读取数据,但是根据需求,缓存数据平均每两个小时刷新一次。我正在查看文档和其他 stackoverflow 问题,有些人建议使用 apache-bahir 驱动程序连接到 redis 缓存,但该驱动程序已过时且不由 flink 维护。我在 flink 中使用 richsink 函数连接到 redis 缓存,但我无法弄清楚是否有任何方法可以从接收器函数刷新缓存。是否推荐使用flink中的sink功能连接redis。

redis apache-flink flink-streaming
1个回答
0
投票

虽然其中一个 Redis 接收器可能不支持开箱即用,但您似乎可以使用管道中的进程函数显式处理该函数,该函数对 Redis 进行适当的 API 调用(可能会根据您的配置而有所不同) ).

您可以使用表示更新频率(在本例中为两个小时)的显式 TTL 来实现此功能。这样,当数据通过时,它会检查缓存是否需要刷新(基于 TTL),如果状态已过期,您可以发出刷新请求,可能类似于:

class RedisCheckFunction: ProcessFunction<YourClass, YourClass>() {
    // ValueState for storing the boolean value
    private lateinit var shouldReset: ValueState<Boolean>

    @Throws(Exception::class)
    override fun open(parameters: Configuration?) {
        super.open(parameters)

        // Store a reference to the state for Redis
        val descriptor = ValueStateDescriptor("redis-reset-state", Boolean::class.java)

        // Set TTL for the state
        descriptor.enableTimeToLive(
            StateTtlConfig.newBuilder(Time.hours(2))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build()
        )

        // Materialize the state itself
        shouldReset = runtimeContext.getState(descriptor)
    }

    @Throws(Exception::class)
    override fun processElement(record: YourClass, context: Context, collector: Collector<YourClass>) {
        // Read the current value from state
        val shouldResetRedis = shouldReset.value()
        if (shouldResetRedis) {
            issueHttpRequestToUpdateRedis()
            shouldReset.update(true)
        } 
        
        collector.collect(record)
    }
    
    private fun issueHttpRequestToUpdateRedis() {
        // Omitted for brevity (use an HttpClient to issue a request to Redis)
    }
}

这可以作为接收器之前的传递(或者如果您正在编写自定义接收器,请将此功能添加到接收器):

yourStream
    .process(RedisCheckFunction())
    .addSink(...)
© www.soinside.com 2019 - 2024. All rights reserved.