1 消费者与消费组
消费者(Consumer)订阅Kafka中的主题(Topic),并且从主题上拉取消息,每个消费者都对应着一个消费组(Consumer Group),当消费被发布到主题后,会被投递到所有订阅它的消费组中的一个消费者。
消费者只能消费所分配的分区中的消息,每一个分区只能被一个消费组中的一个消费者所消费。
图:消费者、消费组、主题、分区之间的关系
如果消费者的数量大于分区的数量,将出现有消费者消费不到任何分区的消息,从而造成资源浪费。
2 客户端开发
2.1 客户端示例
public class KafkaConsumerAnalysis {
public static Properties initConfig() {
Properties properties = new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "a.kolbe.com.cn:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group.demo");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer.client.id.demo");
return properties;
}
public static void main(String[] args) throws Exception {
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(initConfig());
kafkaConsumer.subscribe(Collections.singletonList("topic-demo"));
while(true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
StringBuilder sb = new StringBuilder();
sb.append("topic:")
.append(record.topic())
.append(", partition:")
.append(record.partition())
.append(", key:")
.append(record.key())
.append(", value:")
.append(record.value());
System.out.println(sb.toString());
}
}
}
}
2.2 必要的参数配置
- bootstrap.servers:指定连接的Kafka集群地址清单,多个则以英文逗号分隔
- group.id:指定消费者所属的消费组
- key.deserializer:对应生产者的key.serializer,用来做Key的反序列化
- value.deserializer:对应生产者的value.serializer,用来做Value的反序列化
- client.id:用来指定客户端的id,非必要参数
2.3 订阅主题和分区
1)订阅单个主题
public static void main(String[] args) throws Exception {
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(initConfig());
kafkaConsumer.subscribe(Collections.singletonList("topic-demo"));
}
2)订阅多个主题
public static void main(String[] args) throws Exception {
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(initConfig());
kafkaConsumer.subscribe(Arrays.asList("topic-demo1", "topic-demo-2"));
}
3)订阅分区
public static void main(String[] args) throws Exception {
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(initConfig());
kafkaConsumer.assign(Collections.singleton(new TopicPartition("topic-demo", 0)));
}
4)正则表达式订阅
public static void main(String[] args) throws Exception {
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(initConfig());
kafkaConsumer.subscribe(Pattern.compile("topic-.*"));
}
2.4 获取分区信息
public static void main(String[] args) throws Exception {
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(initConfig());
List<PartitionInfo> partitions = kafkaConsumer.partitionsFor("topic-demo");
}
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final Node[] offlineReplicas;
}
PartitionInfo类中包含了以下信息
- topic:分区的topic信息
- partition:分区号
- leader:分区的leader所在节点
- replicas:分区所有的副本所在节点
- inSyncReplicas:ISR所在节点集合
- offlineReplicas:OSR所在节点集合
2.4 取消订阅
public static void main(String[] args) throws Exception {
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(initConfig());
kafkaConsumer.subscribe(Collections.singletonList("topic-demo"));
kafkaConsumer.unsubscribe();
}
3 反序列化
Kafka生产者需要用序列化器将对象转换成字节数组,然后通过网络传送给Kafka,则在消费者端,需要使用反序列化器,把从Kafka中收到的字节数组转换成相应的对象。
3.1 Deserializer
public interface Deserializer<T> extends Closeable {
void configure(Map<String, ?> configs, boolean isKey);
T deserialize(String topic, byte[] data);
@Override
void close();
}
Deserializer接口包含三个方法
- configure:负责配置当前的类
- deserialize:负责执行反序列化操作,将字节数组转换成对象
- close:负责关闭当前序列化器,需要保证幂等性,有可能被KafkaConsumer调用多次
3.2 StringDeserializer
public class StringDeserializer implements Deserializer<String> {
private String encoding = "UTF8";
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("deserializer.encoding");
if (encodingValue instanceof String)
encoding = (String) encodingValue;
}
@Override
public String deserialize(String topic, byte[] data) {
try {
if (data == null)
return null;
else
return new String(data, encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
}
}
@Override
public void close() {
// nothing to do
}
}
StringDeserializer负责处理字符串类型的对象反序列化,该对象的configure方法主要用来设置字段串的字符集,其中设置的键为
- key.deserializer.encoding:键的反序列化编码类型
- value.deserializer.encoding:值的反序列化编码类型
- deserializer.encoding:当键和值均未指定时,使用的编码类型
如果上述三个设置均没有配置,则使用默认的编码集:UTF-8
4 消息消费
4.1 KafkaConsumer.poll
Kafka中的消费是基于拉模式,该模式是消费者主动向服务端发起请求来拉取消息,KafkaConsumer.poll方法带有一个超时时间参数,该参数用来控制方法的阻塞时间,在消费者缓冲区里没有可用数据时,会发生阻塞,当到达超时时间仍然没有消费时,会直接返回。
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
4.1 ConsumerRecord
public class ConsumerRecord<K, V> {
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final int serializedKeySize;
private final int serializedValueSize;
private final Headers headers;
private final K key;
private final V value;
}
- topic:代表消息的主题
- partition:代表消息的分区
- offset:代表消息的偏移
- timestamp:代表消息的时间戳
- timestampType:代表消息的时间戳类型(NO_TIMESTAMP_TYPE没有时间戳,CREATE_TIME消息的创建时间,LOG_APPEND_TIME追加到日志文件的时间)
- serializedKeySize:key经过序列化后的大小
- serializedValueSize:value经过序列化后的大小
- headers:消息头
- key:消息键
- value:消息值
5 位移提交
5.1 位移概念
在Kafka中offset有两层意思:
- 偏移量:代表消息在分区中的位置
- 消费位移:代表消费者消费到了具体的位置
KafkaConsumer.poll() 方法每次调用的时候返回的未消费的消息集,所以需要记录上一次消费时的消费位移,为了做到这一点,需要将消费位移持久化,在新的消费者客户端中,消费位移存储在Kafka内部的主题__consumer_offsets中。消费位移持久化的动作称为“提交”,消费者在消费完消息之后需要执行消费位移的提交操作。
position:下一次要拉取的消息位置
committedOffset:已经提交过的消费位移
lastConsumedOffset:当前消费到的位置
注:假设当前消费到了x位置,当需要提交消费位移时,提交的值为 x + 1(position),而非 x
public static void main(String[] args) throws Exception {
TopicPartition topicPartition = new TopicPartition("topic-demo", 0);
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(initConfig());
kafkaConsumer.assign(Collections.singletonList(topicPartition));
OffsetAndMetadata committedOffset = kafkaConsumer.committed(topicPartition);
long position = kafkaConsumer.position(topicPartition);
}
Kafka提供了两个方法获取position和committedOffset
- KafkaConsumer.position() 获取 position
- KafkaConsumer.committed() 获取 committedOffset
5.2 自动提交
Kafka提交消费位移的动作默认是自动的,可以通过以下两个配置来控制自动提交的开关和间隔
# 是否开启自动提交,默认为true,关闭的话需要修改成false
enable.auto.commit=true
# 自动提交的间隔,默认为5秒,enable.auto.commit=true时该配置才生效
auto.commit.interval.ms=5000
注:在默认的情况下,消费者会每隔5秒获取每个分区中的最大消费位移,并进行提交
5.3 同步提交
5.3.1 不带参数提交
public class KafkaConsumer<K, V> implements Consumer<K, V> {
@Override
public void commitSync() {
commitSync(Duration.ofMillis(defaultApiTimeoutMs));
}
}
不带参数的同步提交将对整个拉取到的消息集进行同步提交,commitSync() 方法会你根据 poll() 方法拉取的最新位移来进行提交,提交的值对应的是 position的位置
5.3.2 带参数提交
public class KafkaConsumer<K, V> implements Consumer<K, V> {
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
}
}
带参数的同步提交可以支持按分区的粒度进行位移提交,如果以最可靠的方式则每消费一条提交一次,但性能也最低,其次则可以每个分区消费完提交一次,但存在重复消费的可能性。
5.4 异步提交
5.4.1 不带参数提交
public class KafkaConsumer<K, V> implements Consumer<K, V> {
@Override
public void commitAsync() {
commitAsync(null);
}
}
不带参数的异步提交将对整个拉取到的消息集进行异步提交,commitAsync() 方法会你根据 poll() 方法拉取的最新位移来进行提交,提交的值对应的是 position的位置
5.4.2 带参数提交
public class KafkaConsumer<K, V> implements Consumer<K, V> {
@Override
public void commitAsync(OffsetCommitCallback callback) {
acquireAndEnsureOpen();
try {
commitAsync(subscriptions.allConsumed(), callback);
} finally {
release();
}
}
@Override
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
acquireAndEnsureOpen();
try {
log.debug("Committing offsets: {}", offsets);
coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
} finally {
release();
}
}
}
带参数提交的方法支持提交成功或失败时触发回调操作,以及支持按分区提交成功或失败时触发回调操作
5.5 重复消费
当消费者拉取消息后,在提交消费位移之前,消费者消费了一些消息后崩溃了,此时如果重启应用,将从上次拉取的位置进行消费,而重启前消费的那几条数据将重新被消费,此时即产生了重复消费的问题
5.6 消息丢失
当消费者拉取消息后,在消费完所有消息之前,就提交了消费位移,此时如果消费者还没消费完所有的数据,应用就崩溃了,如果应用重启将从最新的位置消费,而之前未消费完的数据丢失了,此时即产生了消息丢失的问题
6 控制或关闭消费
6.1 暂停指定分区拉取
public class KafkaConsumer<K, V> implements Consumer<K, V> {
@Override
public void pause(Collection<TopicPartition> partitions) {
acquireAndEnsureOpen();
try {
log.debug("Pausing partitions {}", partitions);
for (TopicPartition partition: partitions) {
subscriptions.pause(partition);
}
} finally {
release();
}
}
}
6.2 启用指定分区拉取
public class KafkaConsumer<K, V> implements Consumer<K, V> {
@Override
public void resume(Collection<TopicPartition> partitions) {
acquireAndEnsureOpen();
try {
log.debug("Resuming partitions {}", partitions);
for (TopicPartition partition: partitions) {
subscriptions.resume(partition);
}
} finally {
release();
}
}
}
6.3 获取暂停的分区
public class KafkaConsumer<K, V> implements Consumer<K, V> {
@Override
public Set<TopicPartition> paused() {
acquireAndEnsureOpen();
try {
return Collections.unmodifiableSet(subscriptions.pausedPartitions());
} finally {
release();
}
}
}
6.4 中断消费
public class KafkaConsumer<K, V> implements Consumer<K, V> {
@Override
public void wakeup() {
this.client.wakeup();
}
}
通过调用wakeup()方法可以退出poll()逻辑,并抛出WakeupException异常
7 指定位移消费
7.1 默认消费位移
在Kafka中每当消费者找不到记录的消费位移时,就会根据消费者客户端参数配置来决定从何处开始进行消费
# 该配置用来决定消费者找不到消费位移时,从哪里开始消费
# 该配置可以取三种值:latest(最新位置),earliest(起始处),none(找不到则抛异常)
auto.offset.reset=latest
7.2 指定位移消费
7.2.1 指定固定的位移
KafkaConsumer.seek()支持按指定位移消费,因为seek()仅支持重置消费者分配到的分区的消费位置,而分区的分配又在poll()中,所以在seek()方法调用前,需要执行一次poll()操作。
private void seekFixedOffset() {
Properties properties = new Properties();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("topic-demo"));
Set<TopicPartition> assignmentPartitions = new HashSet<>();
while(assignmentPartitions.size() == 0) {
consumer.poll(Duration.ofMillis(100));
assignmentPartitions = consumer.assignment();
}
for (TopicPartition topicPartition : assignmentPartitions) {
consumer.seek(topicPartition, 10);
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
}
}
7.2.2 指定分区的末尾
private void seekEndOffset() {
Properties properties = new Properties();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("topic-demo"));
Set<TopicPartition> assignmentPartitions = new HashSet<>();
while(assignmentPartitions.size() == 0) {
consumer.poll(Duration.ofMillis(100));
assignmentPartitions = consumer.assignment();
}
Map<TopicPartition, Long> partitionOffsets = consumer.endOffsets(assignmentPartitions);
for (TopicPartition topicPartition : assignmentPartitions) {
consumer.seek(topicPartition, partitionOffsets.get(topicPartition));
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
}
}
7.2.3 指定分区的开头
private void seekBeginOffset() {
Properties properties = new Properties();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("topic-demo"));
Set<TopicPartition> assignmentPartitions = new HashSet<>();
while(assignmentPartitions.size() == 0) {
consumer.poll(Duration.ofMillis(100));
assignmentPartitions = consumer.assignment();
}
Map<TopicPartition, Long> partitionOffsets = consumer.beginningOffsets(assignmentPartitions);
for (TopicPartition topicPartition : assignmentPartitions) {
consumer.seek(topicPartition, partitionOffsets.get(topicPartition));
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
}
}
8 拦截器
8.1 接口
Kafka消费者拦截器是通过实现 ConsumerInterceptor接口来实现的
public interface ConsumerInterceptor<K, V> extends Configurable {
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
public void close();
}
8.2 onConsume() 方法
KafkaConsumer会在调用poll()方法返回之前调用调过ConsumerInterceptor.onConsume()方法
private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMetadataInTimeout) {
acquireAndEnsureOpen();
try {
if (timeoutMs < 0) throw new IllegalArgumentException("Timeout must not be negative");
if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
}
long elapsedTime = 0L;
do {
client.maybeTriggerWakeup();
final long metadataEnd;
if (includeMetadataInTimeout) {
final long metadataStart = time.milliseconds();
if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {
return ConsumerRecords.empty();
}
metadataEnd = time.milliseconds();
elapsedTime += metadataEnd - metadataStart;
} else {
while (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) {
log.warn("Still waiting for metadata");
}
metadataEnd = time.milliseconds();
}
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime));
if (!records.isEmpty()) {
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
client.pollNoWakeup();
}
// 在返回记录之前调用 ConsumerInterceptor.onConsume() 方法
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
final long fetchEnd = time.milliseconds();
elapsedTime += fetchEnd - metadataEnd;
} while (elapsedTime < timeoutMs);
return ConsumerRecords.empty();
} finally {
release();
}
}
8.3 onCommit() 方法
KafkaConsumer会在调用commitSync()及commitAsync()方法成功之后调用调过ConsumerInterceptor.onCommit()方法
public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, long timeoutMs) {
invokeCompletedOffsetCommitCallbacks();
if (offsets.isEmpty())
return true;
long now = time.milliseconds();
long startMs = now;
long remainingMs = timeoutMs;
do {
if (coordinatorUnknown()) {
if (!ensureCoordinatorReady(remainingMs))
return false;
remainingMs = timeoutMs - (time.milliseconds() - startMs);
}
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
client.poll(future, remainingMs);
invokeCompletedOffsetCommitCallbacks();
if (future.succeeded()) {
if (interceptors != null)
// 提交位移成功后调用
interceptors.onCommit(offsets);
return true;
}
if (future.failed() && !future.isRetriable())
throw future.exception();
time.sleep(retryBackoffMs);
now = time.milliseconds();
remainingMs = timeoutMs - (now - startMs);
} while (remainingMs > 0);
return false;
}
9 重要的消费者参数
参数 | 说明 | 默认值 |
---|---|---|
fetch.min.bytes | Kafka在收到Consumer拉取请求时,如果数据量小于该值,则需要进行等待 | 1B |
fetch.max.bytes | Kafka在收到Consumer拉取请求时,最大拉取的数据量 | 50MB |
fetch.max.wait.ms | 拉取的等待时间,如果消息不满足fecth.min.bytes参数要求,最终会等待到该参数配置值后,直接返回 | 500ms |
max.partition.fetch.bytes | 限制每个分区最大的拉取量,fetch.max.bytes用来限制总体的拉取量 | 1M |
max.poll.records | Consumer在一次拉取请示中拉取的最大消息数 | 500 |
connections.max.idle.ms | 关闭闲置连接的时间 | 9分钟 |
exclude.internal.topics | 是否向消费者公开内部主题(__consumer_offsets和__transaction_state) | true |
receive.buffer.bytes | Socket接收消息缓冲区的大小,如果为-1则使用操作系统默认值 | 64KB |
send.buffer.bytes | Socket发送消息缓冲区的大小,如果为-1则使用操作系统默认值 | 128KB |
request.timeout.ms | Consumer等待请求响应的最长时间 | 30秒 |
metadata.max.age.ms | 元数据的过期时间,如果元数据在限定时间没有更新,则会被强制更新 | 5分钟 |
reconnect.backoff.ms | 消费者尝试重连主机的等待时间 | 50毫秒 |
retry.backoff.ms | 尝试失败请求的间隔 | 100毫秒 |
isolation.level | 事务的隔离级别,可选值为read_uncommitted和read_committed,默认为read_uncommitted,即可消费到High Watermark处的位置 | read_uncommitted |