[原创]解决Spring Boot Kafka 分区不均匀问题

现象

我们通常使用多个分区,开多线程来提高Kafka的消费速度,分区不均匀会导致线程闲置,消费速度过慢,进而导致消息积压

问题原因

消息写入哪个分区是由生产者决定的,在调用kafkaTemplate.send()方法时,可以指定分区,否则使用默认分区器DefaultPartitioner计算。因为分区可能会调整,通常我们不会指定固定分区,而是依靠分区计算器。

查看DefaultPartitioner代码可以得知:当指定了key每次都会计算出固定的分区,否则会自动计算出一个可用分区。

Kafka指定key计算出固定分区是为了满足消息有序的需求,如果你需要保证消息的有序性而指定了key,本文描述的方法可能不适用你。

解决方案

那么解决这个问题的方式就是:

  1. 不再指定key
  2. 自定义分区器,每次计算出不同的分区

方法1不可行,因为这需要调整业务,成本太高,且合理的使用key有助于业务更清晰。

如果你使用较高版本的Kafka(具体版本忘记了),也可以通过attribute代替key来区别不同的消息类型。

这里重点说方法2,首先,新建一个自定义分区器CustomizePartitioner,代码参照DefaultPartitioner并稍作修改,去掉对key的处理逻辑:

@Slf4j
public class CustomizePartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

    public CustomizePartitioner() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        int partition;
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            partition = availablePartitions.get(part).partition();
        } else {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            partition = Utils.toPositive(nextValue) % partitions.size();
        }

        log.debug("topic: {}, key: {}, partition: {}", topic, key, partition);

        return partition;
    }

    private int nextValue(String topic) {
        AtomicInteger counter = this.topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = this.topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }

        return counter.getAndIncrement();
    }

    @Override
    public void close() {
    }
}

其次,新建KafkaConfig,覆盖KafkaAutoConfiguration.kafkaProducerFactory,并添加指定自定义分区器:

@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConfig {
    @Bean
    public ProducerFactory<Object, Object> kafkaProducerFactory(KafkaProperties kafkaProperties) {
        Map<String, Object> properties = kafkaProperties.buildProducerProperties();
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomizePartitioner.class);
        return new DefaultKafkaProducerFactory<>(properties);
    }
}

重启查看debug信息,每次的分区都是不一样的了。

标签: spring, kafka, springboot

已有 4 条评论

  1. fpwag fpwag

    例如我的test主题下有20个分区,当我使用随机分区策略后仅10个分区有消息,而且都是偶数区分。springboot版本是2.3.7,使用内置的spring-kafka组件

    1. 你可以检查下,你的kafka clien端获取的分区列表是否是完整的,否则不太可能出现你说的情况。

  2. fpwag fpwag

    您好,关于kafka生产者消息分区策略中,随机策略总是偶数分区有消息且一半的分区有消息,其他分区都没有数据。如果要均匀的把消息分发到所有分区,请问这样的场景有好的解决方案吗

    1. 生产端不太好去获取当前分区消息数量决定是否均匀,所以默认的就很好了,或者随机。

      你想要的显然是随机方式,那你要解决的就是如果更均匀的随机就行了,这是个算法问题,你可以在我的代码基础上对partition方法进行修改,我就不深入说了。

添加新评论