Kafka生产者有如下作用:
- 是我们生产消息的入口。
- 负责将消息根据用户的配置进行组装。
- 和Kafka服务端进行交互。
本篇文章将会带着大家了解我们视角下Kafka是如何工作的。
Spring Boot with Kafka
使用Kafka的途径有很多,Kafka有自己的原生的两种KafkaProducer实现(新旧两种KafkaProducer分别使用不同的版本和功能,使用前需要注意)、Spring Kafka以及各个公司对Kafka的封装及客户端实现。
在本节中,我们将使用Spring Boot来展现KafkaProducer的使用。
在Spring Boot中使用Kafka,我们需要引入以下依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring.kafka.version}</version>
</dependency>
接下来,我们需要对Kafka做一些配置(application.yml),指定Kafka服务端的地址:
spring:
kafka:
bootstrap-servers: [127.0.0.1:9092]
接着,到了我们的项目的硬编码部分,如果我们是第一次使用Kafka,还没有创建topic,那么可以在应用程序端,声明创建一个新的topic:
/**
* @return 创建topic Bean
*/
@Bean
public NewTopic buildSunshineTopic() {
return new NewTopic("sunshine", 1, (short) 1);
}
当然,如果你的系统比较微,也可以在已有topic的情况下,在配置文件(application.yml)中指定默认的topic:
spring:
kafka:
template:
default-topic: sunshine
由于应用程序级别,Kafka生产消息属于底层通用服务,即任何业务层都可以调用的层级,所以我们创建Service来实现Kafka生产通用服务代码:
@Service("kafkaProduceService")
public class KafkaProduceServiceImpl implements KafkaProduceService {
@Autowired
public KafkaProduceServiceImpl(KafkaTemplate<Object, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
private final KafkaTemplate<Object, Object> kafkaTemplate;
@Override
public String produce(String message) {
this.kafkaTemplate.send("sunshine", 8, "", message);
return "success";
}
}
我们简单构造一个发送消息的入口:
@RestController
@RequestMapping("/laboratory/kafka")
public class KafkaController {
@Autowired
public KafkaController(KafkaProduceService kafkaProduceService) {
this.kafkaProduceService = kafkaProduceService;
}
/**
* Kafka生产服务
*/
private final KafkaProduceService kafkaProduceService;
@PostMapping("/produce/{message}")
public String produce(@PathVariable String message) {
return kafkaProduceService.produce(message);
}
}
接着,调用:
/laboratory/kafka/produce/sunshine
发送几条消息进行测试。
测试结果:
Starting offset: 5
baseOffset: 5 lastOffset: 5 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1584334868467 size: 106 magic: 2 compresscodec: NONE crc: 198471501 isvalid: true
| offset: 5 CreateTime: 1584334868467 keysize: 0 valuesize: 10 sequence: -1 headerKeys: [__TypeId__] key: payload: "sunshine"
baseOffset: 6 lastOffset: 6 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 106 CreateTime: 1584334904169 size: 106 magic: 2 compresscodec: NONE crc: 1400524010 isvalid: true
| offset: 6 CreateTime: 1584334904169 keysize: 0 valuesize: 10 sequence: -1 headerKeys: [__TypeId__] key: payload: "sunshine"
baseOffset: 7 lastOffset: 7 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 212 CreateTime: 1584334909495 size: 106 magic: 2 compresscodec: NONE crc: 2789437657 isvalid: true
| offset: 7 CreateTime: 1584334909495 keysize: 0 valuesize: 10 sequence: -1 headerKeys: [__TypeId__] key: payload: "sunshine"
Kafka服务端已经收到并落地我们发送的消息。
KafkaTemplate的作用
我们调用了KafkaTemplate#send()方法,即向指定Kafka集群发送了消息:
@Override
public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);
return doSend(producerRecord);
}
无论是简单的直接发送,还是指定自定义时间戳发送,亦或是指定partition序号发送,KafkaTemplate都会将这些信息组装成一个ProdcuerRecord,并继续调用底层的KafkaTemplate#doSend()方法。
ProducerRecord
ProducerRecord供我们所在的producer客户端使用,用于将我们写入的topic、partition序号、消息键值对及时间戳等信息,组装为一个对象。
发送至指定的partition
从KafkaTemplate#send()方法调用中可以看出,我们可以发送消息到指定partition,ProducerRecord#partition存储了我们需要发送到的partition序号,默认为null。
partition序号不能指定为负数。
- 如果我们指定了合法的partition序号,record将会被发送到指定的partition上。
- 如果我们没有指定partition序号,但是指定了消息的key,Kafka集群会对key做一次hash计算,并将record发送到hash计算出来的值所在的partition上。
- 如果我们既没有指定partition序号,也没有使用消息的key,那么Kafka集群将会使用算法选取出一个合适的partiton,用于存储record,之前使用的round-robin算法,现在使用的是StickyPartitionCache提供的算法。
时间戳的选择
时间戳和日志的保存策略和日志的切分策略有着密切的关系,并不仅仅是record本身的时间戳,所以对适当的topic选择合适的record时间戳类型,至关重要。
- 如果我们调用KafkaTemplate#send()方法时给定了时间戳,则使用我们提供的时间戳。
- 如果我们没有提供时间戳,ProducerRecord会存储当前时间为record的时间戳。
- 此外Kafka还会根据topic的配置的时间戳策略选取时间戳。
- topic配置使用{@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}类型的时间戳,broker将会使用ProducerRecord#timestamp。
- topic配置使用{@link org.apache.kafka.common.record.TimestampType#LOG_APPEND_TIME LogAppendTime},broker将会在record追加到日志文件时,覆盖ProducerRecord中存储的时间戳。
Header
消息头是Kafka在KIP-82中新加入的消息部分,加入Header会有以下的优点:
- HTTP和TCP协议中都有Header部分,所以消息也应该有Header这部分的概念,有助于和其他协议进行对齐。
- 增加Header可以在集群更自动化的进行路由。
- 可以在Header中设置事务ID字段,完成端到端的事务流程监控。
- 可以检查和修改元数据,比如生产record的clientId,幂等时消息的唯一ID,在多集群路由时最初收到此record的clusterId。
- 消息有时候是需要端到端加密的,一些企业级组件需要访问元数据。
- 消息是可以进行压缩的,在压缩之后,是无法将元数据添加到K中。
等等。
其他两个参数:
- key,消息键,在partition选择中已经讲述过,用于在我们未指定partition序号时,使用hash计算出一个partition序号。
- value,消息主题内容,用于存储具体的消息,相等于HTTP中的payload。
doSend()
在构建完一个ProducerRecord之后,会调用KafkaTemplate#doSend()方法进行发送:
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
// 获取对应类型的producer实例
final Producer<K, V> producer = getTheProducer();
this.logger.trace(() -> "Sending: " + producerRecord);
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
Object sample = null;
// 开启listener的计时器
if (this.micrometerEnabled && this.micrometerHolder == null) {
this.micrometerHolder = obtainMicrometerHolder();
}
if (this.micrometerHolder != null) {
sample = this.micrometerHolder.start();
}
// 使用producer发送消息,并构造回调方法
producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample));
// 如果开启了autoflush,则执行flush操作
if (this.autoFlush) {
flush();
}
this.logger.trace(() -> "Sent: " + producerRecord);
return future;
}
- producer的类型分为三种:普通的producer,由Kafka保证幂等性的producer和支持事务的producer。
- 如果我们需要开启listener的计时器,需要将micrometerEnabled设为true,此参数默认为true。
- 接着,将会使用producer实例发送消息,底层方法KafkaTemplate#send()的返回类型是的Future,KafkaTemplate为我们默认生成了回调任务,用于处理Kafka服务端返回的响应。
- 如果开启了autoflush,则会执行flush()操作。
SettableListenableFuture是Spring包装的ListenableFuture,为ListenableFuture又延伸出了set()和setException()两种便捷操作方法。
三种producer类型
三种producer类型底层其实都是KafkaProducer,但是生成KafkaProducer所使用的参数不同。
事务型producer
如我我们在配置中设定了transaction-id-prefix:
spring:
kafka:
transaction-id-prefix: sunshine
那么就会默认开启事务型的producer。
在开启事务型producer的同时,也会默认同时开启幂等性。
幂等型producer
我们可以配置属性enable.idempotence=true的方式开启幂等型producer,我们可以为ProducerFactory添加此配置。
@Bean
public ProducerFactory<String, String> buildProducerFactory() {
Map<String, Object> kafkaProducerConfigMap = new HashMap<>();
kafkaProducerConfigMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return new DefaultKafkaProducerFactory<>(kafkaProducerConfigMap);
}
ProducerConfig是kafka-clients依赖提供给我们使用的配置属性类,里面有各个客户端的属性配置及用途释义。
幂等型producer将会保证KafkaProducer在重试时不会出现消息重复的现象,也就是将发送语义从至少一次提升至准确一次。
普通型Producer
普通型producer即为我们使用场景最多的producer类型。
getTheProducer()
经过上面对producer的类型的简单介绍后,我们继续看KafkaTemplate#getTheProducer()方法:
private Producer<K, V> getTheProducer() {
// 是否已经配置开启producer的事务性
boolean transactionalProducer = this.transactional;
if (transactionalProducer) {
// 如果已经配置开启prodcuer的事务性,则判断当前场景下,是否已经进入事务
boolean inTransaction = inTransaction();
// 校验事务的状态,如果不允许非事务性的生产过长,并且当前调用线程没有处于事务中
// 进行提醒,建议使用KafkaTemplate#executeInTransaction()或@Transactinal注解
Assert.state(this.allowNonTransactional || inTransaction,
"No transaction is in process; "
+ "possible solutions: run the template operation within the scope of a "
+ "template.executeInTransaction() operation, start a transaction with @Transactional "
+ "before invoking the template method, "
+ "run in a transaction started by a listener container when consuming a record");
// 如果允许非事务的生产过程,并且当前调用线程也不处于事务中,则可以不使用事务producer
if (!inTransaction) {
transactionalProducer = false;
}
}
// 在当前调用线程处于事务中时,需要使用事务型producer
if (transactionalProducer) {
// 如果已经存在producer实例,直接从KafkaTemplate的ThreadLocal中获取producer实例即可
Producer<K, V> producer = this.producers.get();
if (producer != null) {
return producer;
}
// 如果事务是刚刚开启,还没有创建producer实例,则使用producerFactory工厂,配置的transactinId和事务默认等待时间创建新的producer实例
KafkaResourceHolder<K, V> holder = ProducerFactoryUtils
.getTransactionalResourceHolder(this.producerFactory, this.transactionIdPrefix, this.closeTimeout);
return holder.getProducer();
}
// 接下来的两种情况,均由DefaultKafkaProducerFactory提供实现
// 如果允许非事务型的producer,则创建非实物型producer实例
else if (this.allowNonTransactional) {
return this.producerFactory.createNonTransactionalProducer();
}
else {
// 根据实际情况创建producer实例
return this.producerFactory.createProducer();
}
}
- 按照producer的优先级顺序来创建:事务型=>幂等型=>普通型。
- this.tranactional的值由transaction-id-prefix的值来确定,而当前场景是否使用事务型producer,则根据当前线程的环境、已经实例化的ProducerFactory和事务管理器来进行决定。
- 如果需要使用事务型producer,则会从当前已经初始化的producer缓存中获取,或者使用ProducerFactory初始化一个事务型producer。
- allowNonTransactional属性是2.4.3版本新增的特性的,用于在KafkaTemplate为全局事务型时,在未开启事务时,也可以使用普通型producer发送消息。
- 其他情况:
- 配置中开启事务,但是当前消息发送上下文没有调用开启事务,也不允许非事务型producer的创建,则也会创建一个事务型producer。
- 配置中为开启事务,也不允许非实物型Producer的创建,则会创建一个普通的producer事务。
无论是事务型producer还是非事务型的producer,都会调用DefaultKafkaProducerFactory#doCreateProducer()创建producer实例,浓浓的Spring内味儿。
private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
// 如果存在事务ID前缀
if (txIdPrefix != null) {
// 如果需要避免僵尸实例问题,则可以在ProducerFactory中开启producerPerConsumerPartition配置
// 僵尸实例问题:某个producer实例临时失联,新的producer会代替失联的实例,而失联的producer恢复后,就会出现两条同样的消息
if (this.producerPerConsumerPartition) {
return createTransactionalProducerForPartition(txIdPrefix);
}
else {
// 如果我们可以自己处理重复调用问题,则直接创建事务型producer实例
return createTransactionalProducer(txIdPrefix);
}
}
// 如果需要为每个线程创建producer
// 为什么需要为每个线程创建producer
// 多个线程调用同一个producer实例时,一个线程进行flush()操作,会阻塞其他的线程
if (this.producerPerThread) {
CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
// 则为每个线程单独创建producer
if (tlProducer == null) {
tlProducer = new CloseSafeProducer<>(createKafkaProducer());
this.threadBoundProducers.set(tlProducer);
}
return tlProducer;
}
// 此种情况默认为我们经常使用的producer实例
// 即进程内的线程共用同一个producer
if (this.producer == null) {
synchronized (this) {
// 需要双重校验所保证线程安全
if (this.producer == null) {
// 创建producer实例
this.producer = new CloseSafeProducer<>(createKafkaProducer());
}
}
}
return this.producer;
}
此时,producer类型的区分取决于是否配置了tranaction-id-prefix属性。
但是此时不仅仅考虑的是创建类型的问题,还要考虑producer实例运行环境的问题。
在创建事务型producer时:
- 如果我们可以自行处理僵尸实例问题,则直接创建一个向topic发送消息的producer实例。
- 如果需要Kafka客户端来保证消息发送唯一性,则Kafka客户端会为topic的每个partition维护一个producer实例。
若需要Kafka客户端保证消息发送的唯一性,则需要进行设置defaultKafkaProducerFactory.setProducerPerConsumerPartition(true)的同时,还需要在发送消息是指定transaction-id的后缀:TransactionSupport.setTransactionIdSuffix(“1”)
无论是为全局还是每个partition创建事务型producer,最终都会调用DefaultKafkaProducerFactory#doCreateTxProducer()方法:
private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
@Nullable Consumer<CloseSafeProducer<K, V>> remover) {
Producer<K, V> newProducer;
// producuer的配置
Map<String, Object> newProducerConfigs = new HashMap<>(this.configs);
newProducerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, prefix + suffix);
if (this.clientIdPrefix != null) {
newProducerConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
}
// 创建producer实例
newProducer = createRawProducer(newProducerConfigs);
// 开启producer的事务
newProducer.initTransactions();
// 使用Spring包装的一层,可友好关闭的producer实例
return new CloseSafeProducer<>(newProducer, getCache(prefix), remover,
(String) newProducerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
}
- 读取我们手动配置以及默认的Kafka配置,创建一个原生的KafkaProducer。
- 如果我们指定了客户端前缀,Spring还会为每个客户端生成一个唯一的编号,供Kafka服务端进行识别。
- Spring为了处理Bean的生命周期,还将原生的KafkaProducer包装为CloseSafeProducer,这样可以更友好的关闭KafkaProducer实例。
除了开启事务时,为每个partition创建producer实例,Spring Kafka还支持为每个线程创建KafkaProducer实例。
为什么需要为每个线程创建一个producer实例?
是因为如果多个线程同时调用一个producer实例,那么在其中一个线程调用flush()对缓冲区进行清理时,其他线程均会阻塞等待flush()调用结束。
为每个线程创建KafkaProducer实例的实现思路比较简单,是在KafkaTemplate的ThreadLocal中写入KafkaProducer实例。
如何开启每个线程创建一个producer实例?
defaultKafkaProducerFactory.setProducerPerThread(true);
其他情况下,将会以同步的方式创建一个进程内共享的producer实例,
而在创建KafkaProducer时,直接使用原生的API进行初始化:
protected Producer<K, V> createRawProducer(Map<String, Object> configs) {
// 直接实例化Kafka底层的API
return new KafkaProducer<>(configs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());
}