如果一个受限制,如何在kafka中的剩余分区上应用循环法

问题描述 投票:1回答:1

我已经限制了一个主题的一个分区用于特定服务(因此所有请求都将在此处到达服务X)。对于任何其他服务请求将到达剩余的N个分区。

在java中我通过org.apache.kafka.clients.producer.Partitioner接口实现它。

@Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        String partitionKey = (String) key;

        if(Channel.DB.getValue().equalsIgnoreCase(partitionKey) && ( KafkaTopic.TRANS.getValue().equalsIgnoreCase(topic) || KafkaTopic.CONS.getValue().equalsIgnoreCase(topic) )){
            return 1; // this is reserved for SERVICE X only
        }

        return 0; // here i want to produce messages on remaining partitions, how to return partition now?
    }

问题:1:如何返回分区号。在这种情况下2:如何生成其他消息作为循环除了服务X的分区。

我正在使用Apache Kafka 9.0.1。

java apache-kafka kafka-producer-api
1个回答
0
投票

下面的代码对我有用 - 这里的想法是,当密钥不是保留分区时,从可用分区列表中删除该特定分区并循环其余分区。

private final AtomicInteger counter = new AtomicInteger(0);

public static final int SPECIAL_PARTITION_ID = 1;

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();

    String partitionKey = (String) key;

    if ("SPECIAL_CUSTOMER".equals(partitionKey)) {
        LOGGER.info("PARTITION= " + SPECIAL_PARTITION_ID);
        return SPECIAL_PARTITION_ID; //special partition reserved for MY_SPECIAL_CUSTOMER
    } else {
        int nextValue = counter.getAndIncrement();

        List<PartitionInfo> availablePartitions = new ArrayList<>(cluster.availablePartitionsForTopic(topic));

        if (availablePartitions.size() > 0) {

            PartitionInfo specialPartition = null;

            for (PartitionInfo partitionInfo : availablePartitions) {
                if (partitionInfo.partition() == SPECIAL_PARTITION_ID) {
                    specialPartition = partitionInfo;
                    break;
                }
            }

            availablePartitions.remove(specialPartition);

            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            //optional -- depending upon your usecase
            while (true) {
                int p = Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
                if(p != SPECIAL_PARTITION_ID) {
                    return p;
                }
            }
        }
    }
}

如果您只是确保一个密钥总是转到保留分区而其他密钥可能会进行循环,包括特殊分区,那么当密钥用于保留分区时,您可以通过传递partitionId轻松实现它,否则只需根本没有传递密钥,这可以节省你编写自定义分区程序。

此外,如果您不介意保留分区是最后一个分区而其余分区转到其他分区,则可能会有更简单的实现(摘自“Kafka:权威指南”一书)

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;

public class BananaPartitioner implements Partitioner {

    public void configure(Map<String, ?> configs) {} 1

    public int partition(String topic, Object key, byte[] keyBytes,
                           Object value, byte[] valueBytes,
                             Cluster cluster) {
        List<PartitionInfo> partitions =
          cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if ((keyBytes == null) || (!(key instanceOf String))) 2
            throw new InvalidRecordException("We expect all messages
              to have customer name as key")

        if (((String) key).equals("Banana"))
            return numPartitions; // Banana will always go to last
                                     partition

        // Other records will get hashed to the rest of the
           partitions
        return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
    }

    public void close() {}
}
© www.soinside.com 2019 - 2024. All rights reserved.