如何使用Kafka以及原理

Java使用Kafka进行通信

添加依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>

生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class ProducerDemo extends Thread{
private final KafkaProducer<Integer,String> producer;
private final String topic;
public ProducerDemo(String topic) {
Properties properties=new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.32.129:9092,192.168.32.131:9092,192.168.32.133:9092");
properties.put(ProducerConfig.CLIENT_ID_CONFIG,"producer-demo");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
producer=new KafkaProducer<Integer, String>(properties);
this.topic = topic;
}
@Override
public void run() {
int num=0;
while(num<50){
String msg="producer test message:"+num;
try {
producer.send(new ProducerRecord<Integer, String>(topic,msg)).get();
TimeUnit.SECONDS.sleep(2);
num++;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
new ProducerDemo("test").start();
}
}

ProducerConfig.BOOTSTRAP_SERVERS_CONFIG 属性的值是我本地集群的IP地址,我创建的Topic为test,至于怎么创建Topic大家在网上搜索一下如何创建

消费者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConsumerDemo extends Thread {

KafkaConsumer<Integer, String> consumer;
String topic;

public ConsumerDemo(String topic) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.32.129:9092,192.168.32.131:9092,192.168.32.133:9092");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-demo");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id1");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); //自动提交(批量确认)
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//一个新的group的消费者去消费一个topic
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); //这个属性. 它能够消费昨天发布的数据
// latest 情况下,新的消费者将会从其他消费者最后消费的offset处开始消费Topic下的消息
// earliest 情况下,新的消费者会从该topic最早的消息开始消费
// none 情况下,新的消费者加入以后,由于之前不存在offset,则会直接抛出异常
consumer = new KafkaConsumer<Integer, String>(properties);
this.topic = topic;
}

@Override
public void run() {
consumer.subscribe(Collections.singleton(this.topic));
while (true) {
ConsumerRecords<Integer, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));//超时时间1秒钟
consumerRecords.forEach(record -> {
System.out.println(record.key() + "->" + record.value());
});
}
}
public static void main(String[] args) {
new ConsumerDemo("test").start();
}
}

在运行过程中我遇到了一个错误,如下所示

1
2
Connection with /192.168.32.131 disconnected
java.net.ConnectException: Connection refused: no further information

由于我的zk没有和Kafka在同一台机器上面,这是请修改服务器Kafka目录下的config目录中的server.properties文件
修改内容为 listeners=PLAINTEXT://192.168.32.129:9092,192.168.32.129 找个IP为 Kafka 服务器的IP地址。
Kafka 发送消息支持同步和异步,上面那些代码中演示的同步状态,同步是阻塞的,而异步发送则不需要等待阻塞。异步发送代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void run() {
int num=0;
while(num<50){
String msg="producer test message:"+num;
try {
/**
* 同步发送代码片段
* producer.send(new ProducerRecord<Integer, String>(topic,msg)).get();
*/
producer.send(new ProducerRecord<Integer, String>(topic, msg), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println("异步发送回调");
}
});
TimeUnit.SECONDS.sleep(2);
num++;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}

有关于异步发送的还有两个属性需要介绍一下

1
2
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "100");
properties.put(ProducerConfig.LINGER_MS_CONFIG, "100");

batch.size 可以通过这个参数来控制批量提交的字节数大小,linger.ms 就是为每次发送消息请求增加一些延迟

原理部分

在写原理之前我们来看下这张图片,下图展示了Kafka的相关术语以及之间的关系

图片转自链接 https://www.cnblogs.com/qingyunzong/p/9004509.html#_label2
我们来解释一下 Kafka中的名词

Broker

Kafka 集群包含一个或多个服务器,这种服务器被称为 broker

Producer

消息生产者,负责生产消息到 Kafka broker

Consumer

消息消费者,向 Kafka broker 读取消息的客户端,consumer 从 broker拉取(pull)数据并进行处理

Topic

每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为Topic。一个 Topic 的消息虽然保存于一个或多个 broker 上但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处

Partition

每个 topic 可以划分多个分区(Partition,每个Topic至少有一个分区),同一 topic 下的不同分区包含的消息是不同的。每一条消息发送到 broker 时,会根据 partition 的规则选择存储到哪一个 partition。
如果 partition 规则设置合理,那么所有的消息会均匀的分布在不同的 partition 中,有点类似数据库的分库分表的概念,把数据做了分片处理。顺便多说一句如果要保证消息的顺序性同一 topic 下只能创建一个 partition。
刚接触 Kafka 时,我对 Partition 概念挺模糊的,大家可以参考一下知乎上面的这边文章 https://www.zhihu.com/question/28925721

Consumer Group

每个 Consumer 都有一个属于特定的 Consumer Group,就是我们上文中设置的这个属性ConsumerConfig.GROUP_ID_CONFIG

消息的消费原理

在本文第一张图片中,消息生产者通过默认是采用对 key 的 hash 取模的分区算法分配到 partition 中,那消息是如何消费的我们来一起看下,如下图所示

在上图中一个 topic 分为3个分区,同一group下分为3个消费者。在一一对应的情况下 consumer1 会消费p0分区、consumer2 会消费 p1分区、consumer3 会消费 p2分区。
如果在 consumer 以及 partition 不相等的情况下,会是怎么样的结果?无非就是 consumer 少了或者 consumer 多了。
3个分区,2个 consumer 情况下,必会有一个 consumer 消费两个分区,另外一个 consumer 消费一个分区。
同样也是3个分区,4个 consumer 情况下必有一个 consumer 无法消费消息。所以 consumer 和 partition 数量上要有一定的规划。

  1. 如果 consumer 比 partition 多的情况下实属浪费。所以 consumer 不要大于 partition。
  2. 如果 consumer 比 partition 少,一个 consumer 会对应于多个 partition ,consumer 也不要太少,否则也会导致 partition 里面的数据取的不均匀。最好 partiton 数目是 consumer数目的整数倍,比如 partition 为 24,这样我们就很容易设定 consumer 的数目。
  3. 增减 consumer,broker,partition 会导致 rebalance ,所以 rebalance 后 consumer 对应的 partition 会发生变化。

分区策略

在同一个 group 中的消费者对于一个 topic 中的多个 partition,消费者消费消息时必定会存在一定的分区分配策略。在 Kafka 中,存在三种分区分配策略。
分别为 Range 范围分区策略(默认使用这种策略)、 另一种是 RoundRobin(轮询)、 StickyAssignor(粘性)。 在消费端中的 ConsumerConfig 中,通过属性 ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG 来指定分配策略。

Range 范围分区

我们来看下范围分区时如何划分的,Range分区是对每个主题而言的,首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
比如有10个分区,分别为P1、P2、P3、P4、P5、P6、P7、P8、P9、P10,三个消费者C1、C2、C3。结果是这样子的
C1 消费 P1、P2、P3、P4
C2 消费 P5、P6、P7
C3 消费 P8、P9、P10
那这种范围分区根据什么算法公式来分配的呢?如下公式所示:
n = 分区数/消费者数量
m= 分区数%消费者数量
前m个消费者每个分配n+l个分区,后面的消费者每个分配n个分区

RoundRobin 轮询分区

轮询分区是把所有 partition 和所有 consumer 都列出来,然后按照 hashcode 进行排序。最后通过轮询算法分配 partition 给 consumer。
如果所有 consumer 实例的订阅是相同的,那么 partition 会均匀分布。
在我们的例子里面,假如按照 hashCode 排序完的 topic-partitions 组依次为P1、P2、P3、P4、P5、P6、P7、P8、P9、P10,我们的消费者线程排序为C1-0, C1-1, C2-0, C2-1,最后分区分配的结果为:
C1-0 将消费 P1、 P5、 P9
C1-1 将消费 P2、 P6、P10
C2-0 将消费 P3、 P7
C2-1 将消费 P4、 P8
使用轮询策略必须满足两个条件

  1. 每个主题的消费者实例具有相同数量的流
  2. 每个消费者订阅的主题必须是相同的

StrickyAssignor 分配策略

kafka在0.11.x版本支持了StrickyAssignor(粘滞分区)它主要有两个目的

  • 分区的分配尽可能的均匀
  • 分区的分配尽可能和上次分配保持相同
    当两者发生冲突时, 第一个目标优先于第二个目标。 鉴于这两个目标,StickyAssignor分配策略比上面两种分配策略要复杂,假设我们有这样一个场景
    假设消费组有3个消费者:C0,C1,C2,它们分别订阅了4个Topic(t0,t1,t2,t3),并且每个主题有两个分区(p0,p1),也就是说,整个消费组订阅了8个分区:tOpO 、 tOpl 、 tlpO 、 tlpl 、 t2p0 、t2pl 、t3p0 、 t3pl
    初始分配是这样子的
    CO: tOpO、tlpl、t3p0
    Cl: tOpl、t2p0、t3pl
    C2: tlpO、t2pl
    仔细一看分配方式有点类似于轮询策略,但实际上并不是,因为假设这个时候,C1这个消费者挂了,就势必会造成重新分区(reblance),如果是轮询,那么结果应该是
    CO: tOpO、tlpO、t2p0、t3p0
    C2: tOpl、tlpl、t2pl、t3pl
    而在 StrickyAssignor 它是一种粘滞策略,所以它会满足分区的分配尽可能和上次分配保持相同,所以分配结果如下
    CO: tOpO、tlpl、t3p0、t2p0
    C2: tlpO、t2pl、tOpl、t3pl
    C0和C2保留了上一次是的分配结果,并且把原来C1的分区分配给了C0和C2。 这种策略的好处是使得分区发生变化时,减少了不必要的分区移动。

Kafka之消费者协调器和组协调器

了解了 Kafka 中消费者的分区分配策略之后是否会有这样的疑问:如果消费者客户端中配置了两个分配策略,那么以哪个为准呢?如果有多个消费者,彼此所配置的分配策略并不完全相同,那么以哪个为准?
多个消费者之间的分区分配是需要协同的,这个协同的过程又是怎样的呢 ?这一切都是交由消费者协调器(ConsumerCoordinator)和组协调器(GroupCoordinator)来完成的,它们之间使用一套组协调协议进行交互。
ConsumerCoordinator 与 GroupCoordinator 之间最重要的职责就是负责执行消费者再均衡(rebalance)的操作当出现以下几种种情形会触发再均衡的操作

同一 consumer group 中新增消费者
消费者停机或者宕机当前 consumer group
topic新增了分区(也就是分区数量发生了变化)

我们来看下再均衡操作的具体内容。当有消费者加入消费组时,消费者、消费组及组协调器之间会经历 一下几个阶段。

第一阶段 find_Coordinator
如果消费者已经保存了消费组对应的 GroupCoordinator,并且与它之间的网络连接是正常情况下,则不需要此节点,直接进入到第二个阶段。
否则的话消费者向集群中的负载最小的节点发送 FindCoordinatorRequest 请求来查找对应的 GroupCoordinator。

第二阶段 join_group
在第一阶段成功找到消费组所对应的 GroupCoordinator 之后就会加入消费组的阶段,在此阶段的消费者会向 GroupCoordinator 发送 JoinGroupRequest 请求,并处理响应。
JoinGroupRequest 的几个重要参数如下 :

  • group_id 对应消费组的 id,通常也表示为 groupld
  • session_timout 对应消费端参数 sesson.timeout.ms ,默认值为 10000(10 秒),GroupCoordinator 超过 session_timeout指定的时间内没有收到心跳报文则认为此消费者已经下线
  • rebalance_timeout 对应消费端参数 max.poll.interval.ms ,默认值为300000(5 分钟),表示当消费组再平衡的时候, GroupCoordinator 等待各个消费者重新加入的最长等待时间
  • member_id 表示 GroupCoordinator 分配给消费者的 id 标识。 消费者第一次发送 JoinGroupRequest 请求的时候此字段设置为 null
  • protocol_metadata 序列化后的消费者的订阅信息

新加入的消费者是在 consumer 一侧实现 rebalance 的, 所以 GroupCoordinator 需要为消费组内的消费者选举出一个消费组的 leader。
选举算法比较简单,如果消费组内没有 leader,那么第一个加入消费组的消费者就是消费者 leader,如果这个时候 leader 消费者退出了消费组,那么重新选举一个leader,重新选举类似于随机算法。

前面提到过这么一个问题,如果消费者客户端中配置了两个分配策略,那么以哪个为准呢?如果有多个消费者,彼此所配置的分配策略并不完全相同,那么以哪个为准?
每个消费者都可以设置自己的分区分配策略,对于消费组而言,会从各个消费者上报过来的分区分配策略中选举一个彼此都赞同的策略来实现整体的分区分配,这个“赞同”的规则是,消费组内的各个消费者会通过投票来决定

  • 在 join_group 阶段,每个 consumer 都会把自己支持的分区分配策略发送到 coordinator
  • coordinator 收集到所有消费者的分配策略,组成一个候选集
  • 每个消费者需要从候选集里找出一个自己支持的策略,并且为这个策略投票
  • 最终计算候选集中各个策略的选票数,票数最多的就是当前消费组的分配策略

第三阶段 sync_group
完成分区分配之后,就进入了Synchronizing Group State阶段,主要是向 GroupCoordinator 发送 SyncGroupRequest 请求,并且处理 SyncGroupResponse 响应。
简单来说,就是 leader 将消费者对应 的 partition 分配方案同步给 consumer group 中的所有 consumer,每个消费者都会向 coordinator 发送 sync_group请求,不过只有leader节点会发送分配方案。
当 leader 把方案发给 coordinator 以后,coordinator 会把结果设置到 SyncGroupResponse 中。这样所有成员都知道自己应该消费哪个分区。