[原创]解决Spring Boot Kafka 分区不均匀问题
现象
我们通常使用多个分区,开多线程来提高Kafka的消费速度,分区不均匀会导致线程闲置,消费速度过慢,进而导致消息积压
。
问题原因
消息写入哪个分区是由生产者
决定的,在调用kafkaTemplate.send()
方法时,可以指定分区,否则使用默认分区器DefaultPartitioner
计算。因为分区可能会调整,通常我们不会指定固定分区,而是依靠分区计算器。
查看DefaultPartitioner
代码可以得知:当指定了key
每次都会计算出固定的分区,否则会自动计算出一个可用分区。
Kafka指定key计算出固定分区是为了满足消息有序的需求,如果你需要保证消息的有序性而指定了key,本文描述的方法可能不适用你。
解决方案
那么解决这个问题的方式就是:
- 不再指定key
- 自定义分区器,每次计算出不同的分区
方法1不可行,因为这需要调整业务,成本太高,且合理的使用key有助于业务更清晰。
如果你使用较高版本的Kafka(具体版本忘记了),也可以通过attribute
代替key来区别不同的消息类型。
这里重点说方法2,首先,新建一个自定义分区器CustomizePartitioner
,代码参照DefaultPartitioner并稍作修改,去掉对key的处理逻辑:
@Slf4j
public class CustomizePartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
public CustomizePartitioner() {
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int partition;
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
partition = availablePartitions.get(part).partition();
} else {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
partition = Utils.toPositive(nextValue) % partitions.size();
}
log.debug("topic: {}, key: {}, partition: {}", topic, key, partition);
return partition;
}
private int nextValue(String topic) {
AtomicInteger counter = this.topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = this.topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
@Override
public void close() {
}
}
其次,新建KafkaConfig
,覆盖KafkaAutoConfiguration.kafkaProducerFactory
,并添加指定自定义分区器:
@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConfig {
@Bean
public ProducerFactory<Object, Object> kafkaProducerFactory(KafkaProperties kafkaProperties) {
Map<String, Object> properties = kafkaProperties.buildProducerProperties();
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomizePartitioner.class);
return new DefaultKafkaProducerFactory<>(properties);
}
}
重启查看debug信息,每次的分区都是不一样的了。
例如我的test主题下有20个分区,当我使用随机分区策略后仅10个分区有消息,而且都是偶数区分。springboot版本是2.3.7,使用内置的spring-kafka组件
你可以检查下,你的kafka clien端获取的分区列表是否是完整的,否则不太可能出现你说的情况。
您好,关于kafka生产者消息分区策略中,随机策略总是偶数分区有消息且一半的分区有消息,其他分区都没有数据。如果要均匀的把消息分发到所有分区,请问这样的场景有好的解决方案吗
生产端不太好去获取当前分区消息数量决定是否均匀,所以默认的就很好了,或者随机。
你想要的显然是随机方式,那你要解决的就是如果更均匀的随机就行了,这是个算法问题,你可以在我的代码基础上对partition方法进行修改,我就不深入说了。