Kafka极高延迟的C#

问题描述 投票:0回答:1

我正在Apache Kafka上做一些性能测试,以便与RabbitMQ和ActiveMQ等其他软件进行比较。我的想法是将它用于代理通信的消息系统。

我正在测试多个场景(一对一、广播和多对一),有不同数量的发布者和订阅者,因此负载也不同。即使在最低负载的情况下,10对代理发送500条消息,发送之间有1ms的延迟,我也遇到了非常高的延迟(平均约200ms)。如果我们增加到100对,这个数字就会上升到~1500ms。同样的事情发生在广播和多对一上。

我使用Windows的Kafka 2.12-2.5.0和zookeeper 3.6.1与C# .Net客户端Confluent.Kafka 1.4.2。根据我找到的一些帖子,我已经尝试了一些属性,比如LingerMs = 0。我的Kafka和zookeeper都有默认设置。

我做了一个简单的测试代码,但问题还是发生了。

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;

namespace KafkaSetupAgain
{
    class Program
    {
        static void Main(string[] args)
        {
            int numberOfMessages = 500;
            int numberOfPublishers = 10;
            int numberOfSubscribers = 10;
            int timeOfRun = 30000;

            List<MCVESubscriber> Subscribers = new List<MCVESubscriber>();
            for (int i = 0; i < numberOfSubscribers; i++)
            {
                MCVESubscriber ZeroMqSubscriber = new MCVESubscriber();
                new Thread(() =>
                {
                    ZeroMqSubscriber.read(i.ToString());
                }).Start();
                Subscribers.Add(ZeroMqSubscriber);
            }


            Thread.Sleep(10000);//to make sure all subscribers started
            for (int i = 0; i < numberOfPublishers; i++)
            {
                MCVEPublisher ZeroMqPublisherBroadcast = new MCVEPublisher();
                new Thread(() =>
                {
                    ZeroMqPublisherBroadcast.publish(numberOfMessages, i.ToString());
                }).Start();
            }

            Thread.Sleep(timeOfRun);
            foreach (MCVESubscriber Subscriber in Subscribers)
            {
                Subscriber.PrintMessages("file.csv");
            }
        }

        public class MCVEPublisher
        {
            public void publish(int numberOfMessages, string topic)
            {
                var config = new ProducerConfig
                {
                    BootstrapServers = "localhost:9092",
                    LingerMs = 0,
                    Acks = 0,
                };
                var producer = new ProducerBuilder<Null, string>(config).Build();

                int success = 0;
                int failure = 0;
                Thread.Sleep(3500);
                for (int i = 0; i < numberOfMessages; i++)
                {
                    Thread.Sleep(1);
                    long milliseconds = System.Diagnostics.Stopwatch.GetTimestamp() / TimeSpan.TicksPerMillisecond;

                    var t = producer.ProduceAsync(topic, new Message<Null, string> { Value = milliseconds.ToString() });
                    t.ContinueWith(task => {
                        if (task.IsFaulted)
                        {
                            failure++;
                        }
                        else
                        {
                            success++;
                        }
                    });
                }

                Console.WriteLine("Success: " + success + " Failure:" + failure);

            }
        }

        public class MCVESubscriber
        {
            private List<string> prints = new List<string>();

            public void read(string topic)
            {
                var config = new ConsumerConfig()
                {
                    BootstrapServers = "localhost:9092",
                    EnableAutoCommit = false,
                    FetchErrorBackoffMs = 1,
                };

                var consumerConfig = new ConsumerConfig(config);
                consumerConfig.GroupId = Guid.NewGuid().ToString();
                consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
                consumerConfig.EnableAutoCommit = false;

                using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
                {
                    consumer.Subscribe(new[] { topic });

                    while (true)
                    {
                        var consumeResult = consumer.Consume();

                        long milliseconds = System.Diagnostics.Stopwatch.GetTimestamp() / TimeSpan.TicksPerMillisecond;
                        prints.Add(consumeResult.Message.Value + ";" + milliseconds.ToString());
                    }

                    consumer.Close();
                }
            }

            public void PrintMessages(string path)
            {
                Console.WriteLine("printing " + prints.Count);
                File.AppendAllLines(path, prints);
            }

        }
    }
}

有人知道问题出在哪里吗?我可以改变哪些配置来提高延迟?

谢谢。

Davide Costa

c# apache-kafka kafka-consumer-api kafka-producer-api
1个回答
1
投票

Kafka并不是真正为低延迟的消息分发而构建的,而是为了高可用性。 它可以被配置为具有更低的延迟,但你开始失去很多Kafka提供的优势。

下面说几个小技巧。

  • 关于... KafkaProducer 方面,一般来说,你要等到有足够的消息可以发送时,才能更有效地批处理消息。 这就是 linger.ms 属性,你已经提到了。 通常情况下,这个属性被设置为50ms左右,所以把它设置为0,实际上就是告诉生产者,只要它收到数据,就会尽快发送数据。 这可能会让生产者更 "聊得来",但你可以保证它一拿到数据就会把数据发送到集群。

  • 然而,一旦消息被 "生产 "到Kafka中,它就会等待,直到从下层得到经纪人成功接收消息的ACK。 这里有多种选择。

    • 一旦消息被生产者发送,就被认为是 "收到 "了。也就是说,在本地,一旦网络层发送完毕,生产者就会认为是 "已发送并确认"。
    • 等待你要发送消息的领导经纪商的 ACK,这取决于消息被分配到哪个分区,所以你至少知道有一个经纪商拥有它。 这是默认的。
    • 等待来自你要发送消息的领导代理的 ACK,以及来自该分区在其他代理上的每个副本的 ACK。 这意味着,如果你的集群的复制因子是3,例如消息被发送到broker 1,它就会把消息复制到broker 2和3,这两个broker有同一个分区的副本,等待这些broker回复说他们收到了消息,然后才回复生产者说消息已经被ACK了。 这通常用于这样的环境:你永远不希望丢失一个消息的可能性,所以你总是保证在生产者继续前进之前,你的消息会有三个副本。

官方 acks Kafka文档中的解释。

https:/kafka.apache.org25documentation.html#acks。

还有其他的设置需要考虑,比如kafka生产者压缩和broker压缩设置,这些设置可能会增加更多的延迟开销,但是如果你使用的是默认设置(没有生产者压缩和 producer 选项),在这些步骤中不应该有额外的延迟。

说了这么多,我建议你试着把 "压缩 "设置成 "延迟"。acks 选项为0,看看你的延迟有什么变化。 我猜你会得到更好的延迟。但是 同时也要明白,不能保证你的信息真的被正确接收和存储。 一个不稳定的网络、网络分区等,都可能导致你丢失数据。 对于你的用例来说,这可能是可以的,但只要确保你意识到这一点。

© www.soinside.com 2019 - 2024. All rights reserved.