在使用 MockProducer 调用 send 之前给出完成指令

问题描述 投票:0回答:1

我有以下代码(显然是作为示例):

class RandomClass(
  val producer: Producer<String, String>
) {
  fun randomFunction(): Boolean {
    // Using .get() because I want to make sure it got sent before continuing
    producer.send( (...) ).get()

    return true
  }
}

我通过创建一个 MockProducer 并将其而不是真正的 Producer 注入到类中来构建测试。然后我执行我的函数并验证结果。

"Some test of random function" {
  val mockProducer = MockProducer(
    false,
    StringSerializer(),
    StringSerializer()
  )

  // Build class with mockproducers
  val randomClass = RandomClass(
    producer = mockProducer
  )

  // Execute function
  randomClass.randomFunction()    


  // Verify executions
  mockProducer.history().size shouldBe 1
}

上面的代码现在将挂起,因为函数调用

.get()
正在阻塞线程。这是可以预料的,因为我们还没有告诉 MockProducer 如何处理
.send()
函数调用。

根据我所了解到的情况,似乎我必须决定在调用.send()之后如何处理函数调用。对我来说,这在测试范围中完全没有意义,因为我不知道何时实际调用发送。根据我的测试,我仅控制何时执行对

randomClass.randomFunction()
的调用,之后程序将在可用时执行,并且由于该函数正在阻塞,意味着测试将不会继续,直到
.send()
完成。
因此,由于 
.get(),我无法告诉 MockProducer 在发生后如何处理
.send()
调用。

我引用了 Baeldung 的指南:

https://www.baeldung.com/kafka-mock Producer

(...) 其次,我们将调用mockProducer.errorNext(e),以便MockProducer为最后一次send()调用返回一个异常。

我引用了javadoc中的

MockProducer.completeNext()

成功完成最早未完成的通话。返回: true 如果有未完成的调用来完成

我想提前告诉MockProducer下一个调用或n个调用要做什么。这与使用 Mockito/Mockk 进行模拟的工作方式类似。下面的示例包含我想做的事情:

"Some test of random function" { val mockProducer = MockProducer( false, StringSerializer(), StringSerializer() ) // Build class with mockproducers val randomClass = RandomClass( producer = mockProducer ) // I wish to tell the MockProducer ahead of time that // the next send should be completed successfully mockProducer.completeNext() // Or .errorNext() if I wish to test an error // Execute function randomClass.randomFunction() // Verify executions mockProducer.history().size shouldBe 1 }

我真的没有办法提前告诉MockProducer如何完成对
.send()

的调用吗?

    

我今天遇到了类似的问题。我使用带有回调的发送而不是返回 future 的发送,并且想要提前定义回调结果(是否例外)。
kotlin apache-kafka kafka-producer-api
1个回答
0
投票
您另外要做的就是在消息发送后立即找出 Future 的结果。只有在消息发送到 Kafka Broker 后才能评估结果,这是使用刷新操作完成的。真实(非模拟)生产者可以配置为立即自动刷新,甚至默认情况下会执行此操作,因为 linger.ms 生产者默认设置为 0。要在MockProducer中模拟flush,你必须调用completeNext() errorNext()或flush()。

为了实现你想要的,你可以扩展MockProducer并重写completeSend()以根据你在调用send()之前定义的行为,并且如果你想立即检查Future,还可以重写send()来进行刷新以避免阻塞线程(尽管我会避免在每次发送消息后检查 Future 结果,或者如果您确实需要它,请在手动发送后调用lush(),然后才检查结果)。

实现示例:

public class CompletionMockProducer extends MockProducer<String, String> { private final Queue<RuntimeException> completionResults = new LinkedList<>(); public CompletionMockProducer() { super(false, new StringSerializer(), new StringSerializer()); } public synchronized void addCompletionResults(RuntimeException ... exceptions) { completionResults.addAll(Arrays.stream(exceptions).toList()); } @Override public synchronized void clear() { super.clear(); completionResults.clear(); } @Override public synchronized Future<RecordMetadata> send(ProducerRecord<String, String> record) { var result = super.send(record); flush(); return result; } @Override public synchronized boolean completeNext() { Validate.isTrue(!completionResults.isEmpty(), "Completion result not prepared using #addCompletionResults(...)"); return errorNext(completionResults.poll()); } }


© www.soinside.com 2019 - 2024. All rights reserved.