[原创]解决Spring Boot Kafka 分区不均匀问题

现象

我们通常使用多个分区,开多线程来提高Kafka的消费速度,分区不均匀会导致线程闲置,消费速度过慢,进而导致消息积压

问题原因

消息写入哪个分区是由生产者决定的,在调用kafkaTemplate.send()方法时,可以指定分区,否则使用默认分区器DefaultPartitioner计算。因为分区可能会调整,通常我们不会指定固定分区,而是依靠分区计算器。

查看DefaultPartitioner代码可以得知:当指定了key每次都会计算出固定的分区,否则会自动计算出一个可用分区。

Kafka指定key计算出固定分区是为了满足消息有序的需求,如果你需要保证消息的有序性而指定了key,本文描述的方法可能不适用你。

解决方案

那么解决这个问题的方式就是:

  1. 不再指定key
  2. 自定义分区器,每次计算出不同的分区

方法1不可行,因为这需要调整业务,成本太高,且合理的使用key有助于业务更清晰。

如果你使用较高版本的Kafka(具体版本忘记了),也可以通过attribute代替key来区别不同的消息类型。

这里重点说方法2,首先,新建一个自定义分区器CustomizePartitioner,代码参照DefaultPartitioner并稍作修改,去掉对key的处理逻辑:

@Slf4j
public class CustomizePartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

    public CustomizePartitioner() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        int partition;
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            partition = availablePartitions.get(part).partition();
        } else {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            partition = Utils.toPositive(nextValue) % partitions.size();
        }

        log.debug("topic: {}, key: {}, partition: {}", topic, key, partition);

        return partition;
    }

    private int nextValue(String topic) {
        AtomicInteger counter = this.topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = this.topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }

        return counter.getAndIncrement();
    }

    @Override
    public void close() {
    }
}

其次,新建KafkaConfig,覆盖KafkaAutoConfiguration.kafkaProducerFactory,并添加指定自定义分区器:

@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConfig {
    @Bean
    public ProducerFactory<Object, Object> kafkaProducerFactory(KafkaProperties kafkaProperties) {
        Map<String, Object> properties = kafkaProperties.buildProducerProperties();
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomizePartitioner.class);
        return new DefaultKafkaProducerFactory<>(properties);
    }
}

重启查看debug信息,每次的分区都是不一样的了。

标签: spring, kafka, springboot

添加新评论