在不使用 Thread.sleep 的情况下限制 Flink 作业中的事件流动

问题描述 投票:0回答:1

我是 Flink 新手,我正在尝试实现一个从 Kafka 主题消费的管道,对此数据执行较小的过滤和转换,并异步写入端点。

我面临的挑战是这个端点每分钟接收 10,000 个调用的限制,并且我正在努力在不使用 Thread.sleep 的情况下限制我的速率(这在 Flink 中不被认为是好的做法)。

目前,在高度合法的情况下,我的代码如下所示:

val env = StreamExecutionEnvironment.getExecutionEnvironment()

val kafkaSource = KafkaSource.builder<JsonNode>()
        .setBootstrapServers(...)
        .setTopics(...)
        .setStartingOffsets(OffsetsInitializer.latest())
        .setDeserializer(..)
        .build()

val stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Events Topic", JsonNodeTypeInfo())

val filteredStream = stream.filter(FilterFunction())

val transformedStream = filteredStream.map(TransformFunction())

val rateLimitedStream = transformedStream.process(RateFunction()).setParallelism(1)

val httpSink = HttpAsyncSink()
transformedStream.sinkTo(httpSink)

env.execute("Flink Job")

因此,RateFunction() 是我用来计算事件数量的函数,将其限制为 10k,如果在一分钟内达到此限制,则使线程休眠。

class RateFunction(private val numEventsLimit: Int, private val timeLimitIntervalInSeconds: Int, private val threadSleepTimeInSeconds: Int) : ProcessFunction<JsonNode, JsonNode>() {

    // Counter to track the total number of processed events
    private lateinit var eventsCounter: Counter
    private var lastTime: Long = System.currentTimeMillis()

    override fun open(parameters: Configuration) {
        // Initializing Counters
        eventsCounter = runtimeContext.metricGroup.counter("totalEvents")
    }

    override fun processElement(value: JsonNode, ctx: ProcessFunction<JsonNode, JsonNode>.Context, out: Collector<JsonNode>) {

        // Function to reset counter
        fun resetCounter() {
            eventsCounter = runtimeContext.metricGroup.counter("totalEvents")
        }

        // Function to return the current millis
        fun currentMillis(): Long {
            return System.currentTimeMillis()
        }

        // If statement to manage the rate of the events being published
        if ((eventsCounter.count < numEventsLimit) && ((currentMillis() - lastTime) <= (timeLimitIntervalInSeconds * 1000))) {
            out.collect(value) // forward event
            eventsCounter.inc()
        } else if ((currentMillis() - lastTime) > (timeLimitIntervalInSeconds * 1000)) {
            lastTime = currentMillis()
            resetCounter()
            println("Resetting both eventsCounter and lastTime variables")
            out.collect(value) // forward event
            eventsCounter.inc()
        } else if (eventsCounter.count >= numEventsLimit) {
            println("Sleeping for $threadSleepTimeInSeconds seconds due reached Events Limitation")
            Thread.sleep((threadSleepTimeInSeconds * 1000).toLong())
            lastTime = currentMillis()
            resetCounter()
            out.collect(value) // forward event
            eventsCounter.inc()
        } else {
            throw RuntimeException("Something wrong happened during the process of the element $value")
        }
    }
}

这就是我的 HttpAsyncSink() 函数:

class HttpAsyncSink(private val objectMapper: ObjectMapper): Sink<JsonNode> {
    override fun createWriter(context: InitContext): SinkWriter<JsonNode> {
        return HttpAsyncWriter(context, objectMapper)
    }
}

class HttpAsyncWriter(context: InitContext, private val objectMapper: ObjectMapper): AsyncSinkWriter<JsonNode, JsonNode>(
    HttpRequestJsonPassthrough(),
    context,
    AsyncSinkWriterConfiguration.AsyncSinkWriterConfigurationBuilder()
        .setMaxBatchSize(1)
        .setMaxBatchSizeInBytes(500_000_000)
        .setMaxInFlightRequests(100_000)
        .setMaxBufferedRequests(100_000)
        .setMaxTimeInBufferMS(1_000_000)
        .setMaxRecordSizeInBytes(1_000_000)
        .build(),
    mutableListOf()
) {
    private val client = OkHttpClient()

    override fun submitRequestEntries(
        requestEntries: MutableList<JsonNode>,
        requestToRetry: Consumer<MutableList<JsonNode>>
    ) {
        requestEntries.forEach {
            val endpoint = it.payload.get("endpointUrl").asText()
            val jsonString = objectMapper.writeValueAsString(it.payload)
            val jsonMediaType = "application/json; charset=utf-8".toMediaTypeOrNull()
            val body: RequestBody = jsonString.toRequestBody(jsonMediaType)
            val request: Request = Request.Builder()
                .post(body)
                .url(endpoint)
                .build()
            val call: Call = client.newCall(request)
            call.enqueue(object : Callback {
                    // In case of failure, the record will return to a Consumer/Buffer to be sent again in another moment
                    override fun onFailure(call: Call, e: IOException) {
                        println("Request failed for record: $it\n returning it to the buffer.. Reason: ${e.message}")
                        requestToRetry.accept(mutableListOf(it))
                    }
                    // In case of success, the Collections.emptyList() tells the consumer that the request was accepted
                    override fun onResponse(call: Call, response: Response) {
                        println("Response: ${Collections.singleton(response.body.string())}")
                        response.body.close()
                        requestToRetry.accept(Collections.emptyList())
                    }
                }
            )
        }
    }

    // It returns the size of the record in bytes
    override fun getSizeInBytes(requestEntry: JsonNode): Long {
        return requestEntry.size
    }

}

我尝试使用 allWindow 函数,滚动窗口为 1 分钟,并根据事件数量创建自定义触发器,但没有取得太大成功。我也尝试过使用 CountWindow,但在实现一个系统时遇到了困难,如果达到 10,000 个事件限制,该系统将简单地“暂停”并等待下一分钟发送新事件。

任何关于如何在 Flink 中实现速率限制而不依赖 Thread.sleep 的见解或建议将不胜感激。

谢谢!

kotlin apache-flink flink-streaming
1个回答
0
投票

确实,在 Flink 中你永远不应该在主处理线程中休眠。

但是,您可以安全地在反序列化器中应用速率限制,因为 Flink 在另一个线程中运行反序列化模式。我建议使用番石榴

RateLimiter
,而不是
sleep

© www.soinside.com 2019 - 2024. All rights reserved.