消息队列Kafka-2

8. kafka的java客户端-黑马2019

8.1 依赖

Kafka自身提供的Java客户端来演示消息的收发,与Kafka的Java客户端相关的Maven依赖如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<properties>
<scala.version>2.11</scala.version>
<slf4j.version>1.7.21</slf4j.version>
<kafka.version>2.0.0</kafka.version>
<lombok.version>1.18.8</lombok.version>
<junit.version>4.11</junit.version>
<gson.version>2.2.4</gson.version>
<protobuff.version>1.5.4</protobuff.version>
<spark.version>2.3.1</spark.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.version}</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>dependencies>

8.2 生产者的实现

img

img

img

img

运行java客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
 package com.heima.kafka.chapter1;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
* Kafka 消息生产者
*/
public class ProducerFastStart {
// Kafka集群地址
private static final String brokerList = "192.168.203.128:9092";
// 主题名称-之前已经创建
private static final String topic = "xrtest1";

public static void main(String[] args) {
Properties properties = new Properties();
// 设置key序列化器
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

//另外一种写法
//properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 设置重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, 10);
// 设置值序列化器
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 设置集群地址
properties.put("bootstrap.servers", brokerList);
// KafkaProducer 线程安全
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Kafka-demo-xr", "hello, Kafka!");
try {
producer.send(record);
//RecordMetadata recordMetadata = producer.send(record).get();
//System.out.println("part:" + recordMetadata.partition() + ";topic:" + recordMetadata.topic());
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
}

img

img

可以修改,防止写错

img

8.3 消费者的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.heima.kafka.chapter1;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
* Kafka 消息消费者
*/
public class ConsumerFastStart {
// Kafka集群地址
private static final String brokerList = "192.168.203.128:9092";
// 主题名称-之前已经创建
private static final String topic = "xrtest1";
// 消费组
private static
final String groupId = "group.demo";

public static void main(String[] args) {
Properties properties = new Properties();
properties.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put("bootstrap.servers", brokerList);
properties.put("group.id", groupId);

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));

while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
}

8.4 效果展示

img

可以修改,防止写错

img



9. kafka的java客户端-千峰教育

9.1 生产者实现

9.1.1 生产者的基本实现

  • 引入依赖

img

  • 具体实现

img

img

img

img

发送消息到指定分区上

img

img

img

未指定分区,则会通过业务key的hash运算,算出消息往哪个分区上发

img

9.1.2 生产者的同步发送消息

img

  • 如果生产者发送消息没有收到ack生产者会阻塞阻塞到3s的时间,
  • 如果还没有收到消息,会进行重试重试的次数3次。

img

9.1.3 生产者的异步发送消息

img

  • 异步发送,生产者发送完消息后就可以执行之后的业务,
  • broker在收到消息后异步调用生产者提供的callback回调方法。

img

9.1.4 生产者中的ack的配置

在同步发送的前提下,生产者在获得集群返回的ack之前会⼀直阻塞。那么集群什么时候返回
ack呢?此时ack有3个配置:

  • ack = 0
    kafka-cluster不需要任何的broker收到消息,就立即返回ack给生产者,最容易丢消息的,效率是最高的
  • ack=1(默认)
    多副本之间的leader已经收到消息,并把消息写入到本地的log中,才会返回ack给生产者,性能和安全性是最均衡的
  • ack=-1/all
    里面有默认的配置min.insync.replicas=2(默认为1,推荐配置大于等于2),此时就需要leader和⼀个follower同步完后,才会返回ack给生产者(此时集群中有2个broker已完成数据的接收),这种方式最安全,但性能最差。

img

img

9.1.5 关于消息发送的缓冲区

img

img

9.2 消费者实现

9.2.1 消费者消费消息的基本实现

img

img

img

img

9.2.2 关于消费者自动提交和手动提交offset

提交的内容

消费者无论是自动提交还是手动提交,都需要把所属的消费组+消费的某个主题+消费的某个
分区及消费的偏移量,这样的信息提交到集群的_consumer_offsets主题里面。

img


自动提交

消费者poll消息下来以后就会自动提交offset

img

  • 消费者poll到消息后,默认情况下,会自动向broker的_consumer_offsets主题提交当前主题-分区消费的偏移量。
  • 自动提交会丢消息:因为如果消费者还没消费完poll下来的消息自动提交了偏移量,那么此时消费者挂了,于是下一个消费者会从已提交的offset的下一个位置开始消费消息,之前未被消费的消息就丢失掉了。

手动提交

需要把自动提交的配置改成false

img

在消费完消息后进行手动提交。手动提交又分成了两种:

  • 手动同步提交
    在消费完消息后调用同步提交的方法,当集群返回ack前⼀直阻塞,返回ack后表示提交
    成功,执行之后的逻辑。

img

  • 手动异步提交
    在消息消费完后提交,不需要等到集群ack,直接执行之后的逻辑,可以设置⼀个回调方法,供集群调用。

img

9.2.3 长轮询poll消息

  • 消费者建立了与broker之间的长连接,开始poll消息。
  • 默认情况下,消费者⼀次会poll500条消息。

img

可以根据消费速度的快慢来设置,因为如果两次poll的时间如果超出了30s的时间间隔,kafka
会认为其消费能力过弱,将其踢出消费组。将分区分配给其他消费者。

img

  • 如果每隔1s内没有poll到任何消息,则继续去poll消息,循环往复,直到poll到消息。如
    果超出了1s,则此次长轮询结束。

img

img

意味着:

  • 如果一次poll到500条,就直接执行for循环;
  • 如果这一次没有poll到500条,且时间在1秒内,那么长轮询继续poll,要么到500条,要么到1s;
  • 如果多次poll都没达到500条,且1秒时间到了,那么直接执行for循环。

如果两次poll的间隔超过30s,集群会认为该消费者的消费能力过弱,该消费者被踢出消费组,触发rebalance机制,rebalance机制会造成性能开销。可以通过设置这个参数,让一次poll的消息条数少一点。

9.2.4 消费者的健康状态检查

消费者每隔1s向kafka集群发送心跳,集群发现如果有超过10s没有续约的消费者,将被踢出
消费组
,触发该消费组的rebalance机制,将该分区交给消费组里的其他消费者进行消费

  • 消费者发送心跳的时间间隔

img

  • kafka如果超过10秒没有收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者

img

9.2.5 指定分区和偏移量、时间消费

  • 指定分区消费

img

img

  • 从头消费(消息回溯消费)

img

img

img

  • 指定offset消费

img

img

img

  • 指定时间消费

根据时间,去所有的partition中确定该时间对应的offset,然后去所有的partition中找到该offset之后的消息开始消费。

img

(partition-0被覆盖)

img

9.2.6 新消费组的消费offset规则

新消费组中的消费者在启动以后,默认会从当前分区的最后一条消息的offset+1开始消费(消费新消息)。可以通过以下的设置,让新的消费者第一次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量+1)

  • Latest:默认的,消费新消息
  • earliest:第一次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量+1)

img

默认方式(Latest)

启动后没有拿到任何消息。

img

生产者再发消息,消费者可以拿到消息。

img

earliest,启动后就可以拿到消息

img

MyProducer

img

img

img

MyConsumer

img

img

img

img

10. Springboot中使用Kafka

10.1 引入依赖

img

10.2 编写配置文件

img

img

10.3 编写消息生产者

img

10.4 编写消费者

img

img

img

10.5 消费者中配置消费主题、分区和偏移量

img

img

11. kafka集群中的controller、reblance、HW

11.1 controller

集群中谁来充当controller
每个broker启动时会向zk创建一个临时序号节点,获得的序号最小的那个broker将会作为群中的controller,负责这么几件事:

  • 当集群中有一个副本的leader挂掉,需要在集群中选举出一个新的leader,选举的规则是从isr集合中最左边获得。
  • 当集群中有broker新增或减少,controller会同步信息给其他broker。
  • 当集群中有分区新增或减少,controller会同步信息给其他broker。

img

11.2 rebalance机制

img

  • 前提:消费组中的消费者没有指明分区来消费
  • 触发的条件:当消费组中的消费者分区的关系发生变化的时候.
  • 分区分配的策略:在rebalance之前,分区怎么分配会有这么三种策略
    range:根据公式计算得到每个消费者消费哪几个分区:
    前 <分区数%消费者数量> 的消费者分配 <分区数/消费者数量+1>个分区,
    其余的消费者分配 <分区数/消费者数量> 个分区。
    轮询:大家轮着来
    sticky:粘合策略,如果需要rebalance,会在之前已分配的基础上调整,不会改变之前的分配情况。如果这个策略没有开,那么就要进行全部的重新分配。建议开启。

img

img

img

如果没有用sticky策略,那消费者挂掉触发rebalance,那之前如果用的是轮询,都要重新来一遍。

img

11.3 HW和LEO

  • LEO某个副本最后消息的消息位置(log-end-offset)
  • HW(HighWatermark 高水位):
    已完成同步的位置。
    消息在写入broker时,且每个broker完成这条消息的同步后,hw才会变化。在这之前消费者是消费不到这条消息的。在同步完成之后,HW更新之后,消费者才能消费到这条消息,这样的目的是防止消息的丢失

img

12. Kafka-eagle监控平台

12.1 搭建

img

img

img

img

img

12.2 平台的使用

img

img

img

浏览器输入192.168.203.128.8040/ke

img

img

img

img


消息队列Kafka-2
https://blog.xirui.work/posts/1edfc3f2.html
作者
xirui
发布于
2024年12月18日
许可协议