我正在使用Kafka集群和使用 交易性生产者 用于原子流(读-过程-写)。
// Init Transactions
_transactionalProducer.InitTransactions(DefaultTimeout);
// Begin the transaction
_transactionalProducer.BeginTransaction();
// produce message to one or many topics
var topic = Topics.MyTopic;
_transactionalProducer.Produce(topic, consumeResult.Message);
我正在使用AvroSerializer,因为我用Schema发布消息。
Produce会抛出一个异常。
"System.InvalidOperationException: Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required.\r\n at Confluent.Kafka.Producer`2.Produce(TopicPartition topicPartition, Message`2 message, Action`1 deliveryHandler)"
我看到的所有事务性生产者的例子都使用Produce方法而不是ProduceAsync,所以我不确定是否可以简单地切换到ProduceAsync,并假定事务性生产能正常运行。如果我错了就纠正我,或者帮我找找文件。
否则,我无法找到不是Async的AvroSerializer,继承自ISerializer。
public class AvroSerializer<T> : IAsyncSerializer<T>