简述Kafka消息顺序问题及应对方案

前言

在处理kafka消息并覆盖地写入elasticsearch时,发现旧的数据覆盖了新的数据

这里所说的“新旧”是业务逻辑上的概念,可以把它理解为“消息的时间戳”或“消息中某个字段的大小”

对于新手,困惑油然而生,kafka不是保证了消息顺序吗?

局部有序

kafka保证的是消息局部有序(同一个topic、同一个partition)Apache Kafka

Messages sent by a producer to a particular topic partition will be appended in the order they are sent.

分区选择

kafka消息被发送时需要指定一个topic和一个key,默认情况下做以下处理

  • 若key为空,消息会被随机发送到一个partition
  • 若key不为空,同一个key的消息会被发送到同一个partition

也可以自定义一个处理partition选择的方式(partitioner.class),需要自己实现一个partitioner,并在client创建时指定它

既然kafka的消息是局部有序的,那就应该尽可能把同一类业务的消息放到同一个topic、同一个partition中

回到遇到的问题,发现该topic下使用了多个partition来提高吞吐量,显然对消息顺序造成了一定的影响

消费者顺序消费

为了保证每个partition都被顺序地消费,kafka的机制是“每个partition只能对应一个consumer”

乱序风险

按照一定的要求,看似能保证每一个partition的消息是有序的,仍然会有潜在的消息乱序风险

多个生产者时无法保证消息顺序

多个生产者会导致消息时序复杂化(时间同步、网络问题等等),即使是同一个partition,也不能保证消息有序。

试想多个生产者情况下,顺序的标准是什么?如果时间不同步,甚至可能没有标准!

举个栗子,某个partition的消息由多个生产者产生,Producer A发送Msg 1,随后Producer B发送Msg 2,此时Brocker接收时的顺序可能是Msg 2先到,随后Msg 1到。因此乱序。

因此多个生产者的情况下无法在消息队列保证消息的顺序,这种情况下只能通过consumer做一些额外的处理来保证消息顺序处理。

重试机制

如果是单个生产者,开启了失败retry机制,也可能会导致乱序。retries

Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first.

当生产者使用默认配置时,max.in.flight.requests.per.connection通常不为1,此时会有多个请求按顺序发出,每个请求都是一个消息的集合(Batch),因为kafka为了保证高吞吐量,会累积消息批量发送。

举个栗子,batchA发送后,batchB发送,batchA失败,但batchB成功了,因此重试batchA,而此时batchB已经到达了Brocker,从而乱序。

在官方的client中需要设置max.in.flight.requests.per.connection为1才能保证消息的顺序,它的意思是每个连接允许处理的flight requests数(我的理解是航班数,相当于消息发送的出口,每个出口只保证自己消息的顺序,失败了会重试)。在go的sarama客户端中对应的配置项为config.Net.MaxOpenRequests

随着KIP-91的提出,v2.1.0版本更改了以下配置项

  1. 将timeout整合成一个配置项:delivery.timeout.ms
  2. 默认最大重试次数为MAX_INT

为了保证消息的可靠性,开启retry几乎是必须的。不同客户端实现的默认设置是不同的,目前kafka官方的客户端默认重试次数为MAX_INT,sarama默认重试次数为3,因此如果对消息顺序有要求,使用时一定要注意

问题解决

最后定位到问题是多个生产者、多个partition,因此最终在consumer处先检查了elasticsearch中的记录,再考虑是否覆盖写入。但在多个consumer时会遇到并发安全问题,通过es的版本控制来实现一个简单的乐观锁得以解决问题。

参考