1 发送消息(KafkaProducer)
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
package net.kolbe.kafka.client;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;
public class ProducerSendDemo {
public static final String brokerList = "a.kolbe.com.cn:9092,b.kolbe.com.cn:9092,c.kolbe.com.cn:9092";
public static final String topic = "topic-demo";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
ProducerSendDemo producerDemo = new ProducerSendDemo();
producerDemo.fireAndForgetSend(producer);
producerDemo.syncSend(producer);
producerDemo.asyncSend(producer);
producer.close();
}
}
package net.kolbe.kafka.client;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerFastStart {
public static final String brokerList = "a.kolbe.com.cn:9092,b.kolbe.com.cn:9092,c.kolbe.com.cn:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Collections.singletonList(topic));
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
}
1.1 发后即忘
/**
* Fire And Forget
*/
private void fireAndForgetSend(KafkaProducer<String, String> producer) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Fire And Forget Send Message");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
}
1.2 同步发送
/**
* Sync Send
*/
private void syncSend(KafkaProducer<String, String> producer) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Send Sync Message");
try {
Future<RecordMetadata> future = producer.send(record);
// 同步获取发送结果
RecordMetadata metadata = future.get();
System.out.println(metadata);
} catch (Exception e) {
e.printStackTrace();
}
}
1.3 异步发送
/**
* Async Send
*/
private void asyncSend(KafkaProducer<String, String> producer) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Send Async Message");
try {
producer.send(record, new AsyncCallback());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Async Send Callback
*/
private static class AsyncCallback implements Callback {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println("Async send message callback");
}
}
2 生产者消息(ProducerRecord)
2.1 指定Topic、Value
public ProducerRecord(String topic, V value) {}
2.2 指定Topic、Key、Value
public ProducerRecord(String topic, K key, V value) {}
2.3 指定Topic、Partition、Key、Value
public ProducerRecord(String topic, Integer partition, K key, V value) {}
2.4 指定Topic、Partition、Timestamp、Key、Value
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {}
注:如果Timestamp为空,则取System.currentTimeMillis()
2.5 指定Topic、Partition、Timestamp、Key、Value、Header
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {}
注:如果Timestamp为空,则取System.currentTimeMillis()
2.6 指定Topic、Partition、Key、Value、Header
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {}
3 序列化器(Serializer)
Kafka生产者需要用序列化器将对象转换成字节数组,然后通过网络传送给Kafka,则在消费者端,需要使用反序列化器,把从Kafka中收到的字节数组转换成相应的对象。
3.1 Serializer
public interface Serializer<T> extends Closeable {
void configure(Map<String, ?> configs, boolean isKey);
byte[] serialize(String topic, T data);
@Override
void close();
}
Serializer接口包含三个方法
- configure:负责配置当前的类
- serialize:负责执行序列化操作,将对象转换成字节数组
- close:负责关闭当前序列化器,需要保证幂等性,有可能被KafkaProducer调用多次
3.2 StringSerializer
public class StringSerializer implements Serializer<String> {
private String encoding = "UTF8";
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("serializer.encoding");
if (encodingValue instanceof String)
encoding = (String) encodingValue;
}
@Override
public byte[] serialize(String topic, String data) {
try {
if (data == null)
return null;
else
return data.getBytes(encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
}
}
@Override
public void close() {
// nothing to do
}
}
StringSerializer负责处理字符串类型的对象序列化,该对象的configure方法主要用来设置字段串的字符集,其中设置的键为
- key.serializer.encoding:键的序列化编码类型
- value.serializer.encoding:值的序列化编码类型
- serializer.encoding:当键和值均未指定时,使用的编码类型
如果上述三个设置均没有配置,则使用默认的编码集:UTF-8
4 分区器(Partitioner)
消息在发往Kafka的过程中,要经过序列化器和分区器,分区器是用来确定消息要发往的分区,如果消息发送的时候指定的Partition,则以用户指定的为准,如果没有指定,则通过分区器来计算要发往的分区号。
4.1 分区器接口(Paritioner)
public interface Partitioner extends Configurable, Closeable {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
public void close();
}
Kafka分区器接口Partitioner一共有三个方法,其中自身定义了两个方法:partition及close,以及从Configurable继承来的configure
- partition:通过topic、key、value、cluster来计算分区号
- close:通过close方法在关闭分区器时回收资源
- configure:用来获取配置信息及初始化数据
4.2 默认分区器(DefaultPartitioner)
public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
public void configure(Map<String, ?> configs) {}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
public void close() {}
}
Kafka提供了默认的分区器实现 DefaultPartitioner,其中partition方法通过topic、key、value获取分区号:
- 当key为空时:消息将轮询的方式发到主题的可用分区中
- 当key不为空时:消息将通过对key进行哈希,最终根据得到的哈希值来计算分区号(所有可用分区),相同key将被分配到同一分区
注:分区可用与不可用的区别在于Partition的Leader是否可用,如果Partition的Leader可用,则为可用分区,反之则为不可用分区
4.3 自定义分区器
public class MyPartitioner implements Partitioner {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
return counter.getAndIncrement() % numPartitions;
} else {
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
自定义分区器时,需要通过实现Partitioner接口,通过自定义partition方法,当key为空时,同样取所有的分区做轮询。
配置自定义分区器时使用以下方式:
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
}
5 拦截器(Interceptor)
5.1 拦截器接口(ProducerInterceptor)
public interface ProducerInterceptor<K, V> extends Configurable {
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
public void close();
}
Kafka生产者拦截器接口ProducerInterceptor有三个自带方法onSend、onAcknowlegement、close,以及从父接口继承的configure方法
- onSend:可以对生产者消息进行定制化操作,包括改topic、key、value、partition,但是一般不推荐修改topic、key、partition,因为将影响分区计算及日志压缩
- onAcknowledgement:在消息被应答的后或者消息发送失败后调用,优先于用户设定的callback,该方法运行于Producer IO线程中,建议逻辑简单,否则影响发送速度
- close:用于关闭拦截器时执行一些资源清理工作
- configure:用来做一些初始化操作或获取配置的操作
5.2 自定义拦载器
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
String modifyValue = "prefix-" + record.value();
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), modifyValue, record.headers());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println("Send message error");
} else {
System.out.println("Send message success");
}
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
配置自定义拦截器时使用以下方式:
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
}
注:如果有多个拦截器,则以英文逗号“,”分隔
6 原理分析
6.1 整体架构
Kafka生产者客户端共有两个线程协调运行
- 主线程:由KafkaProducer创建消息,并经过拦截器、序列化器、分区器后,到达消息累加器缓存起来
- Sender线程:负责从消息累加器中获取消息并将其发送给Kafka
6.2 消息累加器(RecordAccumulator)
RecordAccumulator负责缓存KafkaProducer发来的消息,以便Sender可以批量发送减少网络消耗提升性能。
RecordAccumulator缓存的大小通过生产者客户端参数buffer.memory
配置,默认为32MB。如果生产者发送的消息超过发送到服务器的速度,KafkaProducer的send方法将被阻塞,当超过配置的超时时间时,将抛异常。该超时配置为max.block.ms
,默认值为60秒。
RecordAccumulator内部为每一个分区维护了一个双端队列,Producer线程产生的新消息被追加在队列的尾部,Sender线程读取消息从队列的头部,该队列为<Partition, Deque
消息在网络上以字节的方式传输,在发送前需要创建一个内存区来保存消息,频繁的创建和释放比较耗资源,Kafka内部实用ByteBuffer来保存和释放消息,使用BufferPool来实现ByteBuffer的复用,只有特定大小的ByteBuffer会被复用,该大小是通过batch.size
参数来指定,默认为16KB。ProducerRecord在经过RecordAccumulator时,将判断ProducerBatch是否已创建,如果已创建则直接使用,未创建则新建,新建时将判断该消息的大小有没有超过batch.size
设置的大小,如果不超过则使用batch.size
来创建ProducerBatch就可以通过BufferPool来复用,否则以实际大小创建,这段内存区域不会被复用。
6.3 发送线程(Sender)
Sender将从RecordAccumulator中获取缓存消息关封装成<Node, List
Sender在转换成<Node, List
Sender在将消息发往Kafka前还会保存一份InFlightRequests,具体形式为Map<NodeId, Dequemax.in.flight.requests.per.connection
),默认值为5,如果超过5个未响应,则不会在向这个连接发送更多的请求了。
7 重要生产者配置
配置项 | 配置说明 | 默认值 |
---|---|---|
acks | 该参数用来设置多少副本收到消息后,生产者会认为消息成功写入,当为1时代表只要leader副本写入,当为0时代表无需等待服务端响应,当为-1或all时,则代表需要等待所有的ISR副本写入成功才算成功 | 1 |
max.request.size | 生产者发送消息的最大值 | 1MB |
retries | 生产者发送消息的重试次数 | 0 |
retry.backoff.ms | 生产者发送消息的重试间隔 | 100毫秒 |
compression.type | 指定消息的压缩方式:gizp、snappy、lz4、none | none |
connections.max.idle.ms | 指定多久之后关闭闲置的连接 | 9分钟 |
linger.ms | 生产者等待多久将消息发出,需结合ProducerBatch的默认大小使用,当超过Producer默认大小或超过等待时间(linger.ms),则消息会被发出 | 0 |
receive.buffer.bytes | Socker接收消息缓存区的大小,如果设置成-1,则使用操作系统默认值 | 32KB |
send.buffer.bytes | Socket发送消息缓冲区的大小,如果设置成-1,则使用操作系统默认值 | 128KB |
request.timeout.ms | 生产者请求的超时时间,当超时时,会进行重试 | 30秒 |
bootstrap.servers | 连接Kafka集群的地址清单 | |
key.serializer | Key的序列化类 | |
value.serializer | Value的序列化类 | |
buffer.memory | 生产者客户端用于缓存消息的缓冲区大小 | 32MB |
batch.size | 用于指定ProducerBatch可以复用的内存区域大小 | 16KB |
client.id | 设置KafkaProducer的客户端id | |
partitioner.class | 用来指定分区器 | DefaultPartitioner |
interceptor.classes | 用来设定生产者拦截器 | |
max.in.flight.requests.per.connection | 限制每个连接最多缓存的请求数 | 5 |
metadata.max.age.ms | 如果在这个时间内元数据没有更新的话,会被强制一更新 | 5分钟 |
transactional.id | 事处id,必须唯一 |