我在使用单个消费者组中的两个主题(主题 A 和主题 B)时遇到 Kafka 消费者问题。这两个主题的数据中都包含 UTC 时间戳,并且同时发布。我面临的问题是,当一个主题正在被消费时,另一个主题似乎落后了,并且消费没有按预期并行发生。
我的目标是一种更加并行和高效的消费方法,两个主题在发布时都会得到及时处理。如何解决这个问题并优化这两个主题的消耗?我应该考虑哪些最佳实践或配置调整?
我正在为我的 Kafka 消费者使用 Golang Sarama 库。以下是我目前如何设置 Kafka 消费者的片段:
package kafka
import (
"context"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"github.com/Shopify/sarama"
)
type kafka_consumer_group struct {
consumer ConsumerBase
}
var kafka_controller = controllers.NewKafkaController()
func Init(group map[string]interface{}) {
cgo := &kafka_consumer_group{
consumer: ConsumerBase{
brokers: strings.Split(group["brokers"].(string), ","),
topics: strings.Split(group["topics"].(string), ","),
assignor: group["assignor"].(string),
version: group["version"].(string),
ready: make(chan bool),
group: group["name"].(string),
offsetNewest: isOffsetNewest(group["offset"].(string)),
offsetOldest: isOffsetOldest(group["offset"].(string)),
},
}
cgo.init()
}
func (kafka_consumer *kafka_consumer_group) init() {
ctx := context.Background()
zap.Debug(ctx, "kafka_cg:", "Starting a new Sarama consumer for :", kafka_consumer.consumer.group)
version, err := sarama.ParseKafkaVersion(kafka_consumer.consumer.version)
if err != nil {
zap.Fatal(ctx, "kafka_cg", "Error parsing Kafka version:", err)
}
config := sarama.NewConfig()
config.Version = version
switch kafka_consumer.consumer.assignor {
case "sticky":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
case "roundrobin":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
case "range":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
default:
zap.Fatal(ctx, "kafka_cg", "Unrecognized consumer group partition assignor: ", kafka_consumer.consumer.assignor)
}
if kafka_consumer.consumer.offsetNewest {
config.Consumer.Offsets.Initial = sarama.OffsetNewest
}
if kafka_consumer.consumer.offsetOldest {
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(kafka_consumer.consumer.brokers, kafka_consumer.consumer.group, config)
if err != nil {
zap.Fatal(ctx, "kafka_cg", "Error creating consumer group client:", err)
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := client.Consume(ctx, kafka_consumer.consumer.topics, kafka_consumer); err != nil {
zap.Error(ctx, "kafka_cg:Error from consumer: ", err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
zap.Error(ctx, "kafka_cg:consumer context cancelled: ", err)
return
}
kafka_consumer.consumer.ready = make(chan bool)
}
}()
<-kafka_consumer.consumer.ready // Await till the consumer has been set up
zap.Debug(ctx, "kafka_cg", "Sarama consumer up and running!... for", "consumer_group:", kafka_consumer.consumer.group, "with_topics:", kafka_consumer.consumer.topics)
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
zap.Debug(ctx, "kafka_cg", "terminating: context cancelled")
case <-sigterm:
zap.Debug(ctx, "kafka_cg", "terminating: via signal")
}
cancel()
wg.Wait()
if err = client.Close(); err != nil {
zap.Fatal(ctx, "kafka_cg", "Error closing client: ", err)
}
zap.Debug(ctx, "kafka_cg", "consumer group", kafka_consumer.consumer.group, "closed successfully")
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (kafka_consumer *kafka_consumer_group) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(kafka_consumer.consumer.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (kafka_consumer *kafka_consumer_group) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (kafka_consumer *kafka_consumer_group) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
ctx := tracer.WithTraceableContext(context.Background(), "kafka_cg: ConsumeClaim")
// NOTE:
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
// fmt.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
kafka_controller.ProcessMessage(ctx, message.Topic, string(message.Value))
session.MarkMessage(message, "")
session.Commit()
}
return nil
}
任何关于如何在 Golang Sarama 中使用同步 UTC 时间戳实现这两个主题的并行消费的见解、建议或示例将不胜感激。
两个不同的消费者组具有不同的groupID怎么样?